Skip to content

Commit 73eff08

Browse files
authored
배포 v1.6.0
배포 v1.6.0
2 parents 21a8e2d + 9e5554a commit 73eff08

File tree

8 files changed

+378
-110
lines changed

8 files changed

+378
-110
lines changed

MLOps/app/main.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ async def root():
7070
},
7171
"description": "OpenAI GPT 기반 데이트 코스 추천 챗봇",
7272
"status": "active" if hasattr(chatbot, 'langchain_agent_service') else "inactive"
73+
},
74+
"crowd": {
75+
"endpoint": "/api/crowd",
76+
"description": "혼잡도 예측 API",
77+
"status": "active"
7378
}
7479
},
7580
"documentation": "/docs"
@@ -116,11 +121,16 @@ async def get_overall_stats():
116121
"status": "active" if hasattr(chatbot, 'langchain_agent_service') else "inactive",
117122
"type": "OpenAI GPT",
118123
"active_sessions": len(getattr(chatbot, 'active_sessions', {}))
124+
},
125+
"crowd": {
126+
"status": "active",
127+
"type": "Congestion Prediction"
119128
}
120129
},
121130
"endpoints": {
122131
"recommendation": ["/api/recommend", "/api/recommend/health"],
123-
"chatbot": ["/api/chat", "/api/chat/stream", "/api/chat/stats", "/api/chat/health"]
132+
"chatbot": ["/api/chat", "/api/chat/stream", "/api/chat/stats", "/api/chat/health"],
133+
"crowd": ["/api/crowd"]
124134
},
125135
"timestamp": datetime.now().isoformat()
126136
}

MLOps/app/model/deepfm_train.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,21 @@
77
from deepctr_torch.models import DeepFM
88
from deepctr_torch.inputs import SparseFeat, VarLenSparseFeat, get_feature_names
99
import ast
10-
from tensorflow.keras.preprocessing.sequence import pad_sequences
1110
import os
1211
import pickle
1312

13+
def pad_sequences(sequences, maxlen, padding='post', value=0):
14+
"""
15+
NumPy를 이용한 pad_sequences의 간단한 구현
16+
"""
17+
padded = np.full((len(sequences), maxlen), value, dtype=np.int32)
18+
for i, seq in enumerate(sequences):
19+
if padding == 'post':
20+
padded[i, :len(seq)] = seq[:maxlen]
21+
else: # 'pre' padding
22+
padded[i, -len(seq):] = seq[-maxlen:]
23+
return padded
24+
1425
class DeepFMModdelTrain:
1526
def __init__(self, data_path):
1627
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
@@ -57,16 +68,16 @@ def encode(x):
5768
self.data[self.sequence_feature] = pad_sequences(self.data[self.sequence_feature], maxlen=self.max_len, padding='post', value=0)
5869

5970
# 최종 feature 생성
60-
self.sparse_features = [SparseFeat(feature,
71+
sparse_feature_columns = [SparseFeat(feature,
6172
vocabulary_size=self.data[feature].nunique(),
6273
embedding_dim=4) for feature in sparse_feature_names]
6374

64-
self.sequence_feature = [VarLenSparseFeat(SparseFeat(self.sequence_feature,
75+
sequence_feature_columns = [VarLenSparseFeat(SparseFeat(self.sequence_feature,
6576
vocabulary_size=len(self.key2index) + 1,
6677
embedding_dim=4), maxlen=self.max_len, combiner="mean")]
6778

68-
self.linear_feature_columns = self.sparse_features + self.sequence_feature
69-
self.dnn_feature_columns = self.sparse_features + self.sequence_feature
79+
self.linear_feature_columns = sparse_feature_columns + sequence_feature_columns
80+
self.dnn_feature_columns = sparse_feature_columns + sequence_feature_columns
7081

7182
self.feature_names = get_feature_names(self.linear_feature_columns + self.dnn_feature_columns)
7283

@@ -78,13 +89,13 @@ def encode(x):
7889
with open(self.encoders_path, 'wb') as f:
7990
pickle.dump(self.label_encoders, f)
8091
with open(self.key2index_path, 'wb') as f:
81-
pickle.dump(self.key2index, f)
92+
pickle.dump({'key2index': self.key2index, 'max_len': self.max_len}, f)
8293

8394
def train(self):
8495
model = DeepFM(self.linear_feature_columns,
8596
self.dnn_feature_columns,
8697
task="regression",
87-
device=self.device)
98+
device=str(self.device))
8899

89100
model.compile("adam", "mse", metrics=["mse"])
90101

@@ -106,11 +117,14 @@ def predict(self, input_data):
106117
with open(self.encoders_path, 'rb') as f:
107118
self.label_encoders = pickle.load(f)
108119
with open(self.key2index_path, 'rb') as f:
109-
self.key2index = pickle.load(f)
110-
111-
# 예측에 필요한 메타데이터 재구성
112-
temp_like_list = self.data[self.sequence_feature].apply(ast.literal_eval)
113-
self.max_len = max(len(x) for x in temp_like_list)
120+
key2index_data = pickle.load(f)
121+
if isinstance(key2index_data, dict) and 'key2index' in key2index_data:
122+
self.key2index = key2index_data['key2index']
123+
self.max_len = key2index_data['max_len']
124+
else:
125+
# 이전 버전 호환성
126+
self.key2index = key2index_data
127+
self.max_len = 50 # 기본값
114128

115129
sparse_feature_names = ["userid", "name", "age", "gender", "place_id", "place_name", "category", "subcategory"]
116130

@@ -170,7 +184,7 @@ def encode_sequence(x):
170184
model = DeepFM(self.linear_feature_columns,
171185
self.dnn_feature_columns,
172186
task="regression",
173-
device=self.device)
187+
device=str(self.device))
174188
model.load_state_dict(torch.load(self.model_path))
175189
model.compile("adam", "mse", metrics=["mse"])
176190

16.9 KB
Binary file not shown.

MLOps/app/routers/crowd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from fastapi import APIRouter
22
from app.schema.crowd_schema import CrowdResponse, CrowdLevel, CrowdInfo
33

4-
router = APIRouter()
4+
router = APIRouter(prefix="/api", tags=["crowd"])
55

6-
@router.get("/api/crowd", response_model=CrowdResponse)
6+
@router.get("/crowd", response_model=CrowdResponse)
77
async def get_crowd(hour: int):
88
"""
99
## 혼잡도 예측 API
Lines changed: 51 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,68 @@
1-
import mysql.connector
2-
from mysql.connector import pooling, Error
31
import pandas as pd
42
import json
53
import os
4+
from dotenv import load_dotenv
5+
from sqlalchemy import create_engine, exc, text
6+
7+
# .env 파일에서 환경 변수 로드
8+
load_dotenv()
69

710
class DatabaseService:
811
"""DB 커넥션 풀을 이용한 쿼리 서비스 클래스 (환경 변수 설정 및 타임아웃 적용)"""
912
def __init__(self):
1013
"""환경 변수에서 설정을 읽어와 커넥션 풀을 초기화합니다."""
11-
self.pool = None
14+
self.engine = None
1215
try:
13-
db_config = {
14-
'host': os.getenv('DB_HOST', '15.164.50.188'),
15-
'port': int(os.getenv('DB_PORT', 3307)),
16-
'user': os.getenv('DB_USER', 'root'),
17-
'password': os.getenv('DB_PASSWORD', 'pwd1234'),
18-
'database': os.getenv('DB_DATABASE', 'daywalk')
19-
}
20-
pool_size = int(os.getenv('DB_POOL_SIZE', 5))
16+
db_host = os.getenv('DB_HOST')
17+
db_port = os.getenv('DB_PORT')
18+
db_user = os.getenv('DB_USER')
19+
db_password = os.getenv('DB_PASSWORD')
20+
db_database = os.getenv('DB_DATABASE')
21+
22+
if not all([db_host, db_port, db_user, db_password, db_database]):
23+
raise ValueError("DB 연결을 위한 모든 환경 변수가 설정되지 않았습니다.")
2124

22-
self.pool = mysql.connector.pooling.MySQLConnectionPool(
23-
pool_name="daywalk_pool",
24-
pool_size=pool_size,
25-
pool_reset_session=True,
26-
**db_config
25+
# MySQL Connector/Python 용 SQLAlchemy URI
26+
db_uri = f"mysql+mysqlconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_database}"
27+
28+
self.engine = create_engine(
29+
db_uri,
30+
pool_size=5,
31+
pool_recycle=3600, # 1시간마다 연결 재설정
32+
connect_args={'connect_timeout': 10}
2733
)
28-
print("MySQL 커넥션 풀 생성 성공")
29-
except Error as e:
34+
print("SQLAlchemy 커넥션 풀 생성 성공")
35+
except (exc.SQLAlchemyError, ValueError) as e:
3036
print(f"커넥션 풀 생성 오류: {e}")
3137

32-
def _get_connection(self, timeout=3):
33-
"""풀에서 커넥션을 가져옵니다. 타임아웃을 적용하여 무한 대기를 방지합니다."""
34-
if not self.pool:
35-
print("커넥션 풀을 사용할 수 없습니다.")
36-
return None
37-
try:
38-
# 타임아웃(초)을 설정하여 커넥션을 기다립니다.
39-
return self.pool.get_connection(timeout=timeout)
40-
except pooling.PoolError as e:
41-
print(f"풀에서 커넥션을 가져오는 데 실패했습니다 (타임아웃 또는 풀 문제): {e}")
42-
return None
43-
except Error as e:
44-
print(f"커넥션 가져오는 중 알 수 없는 오류 발생: {e}")
45-
return None
38+
def close_connection(self):
39+
if self.engine:
40+
self.engine.dispose()
41+
print("커넥션 풀 종료")
4642

4743
def execute_query(self, query, params=None):
4844
"""쿼리 실행 후 데이터프레임 반환"""
49-
connection = self._get_connection()
50-
if not connection:
45+
if not self.engine:
46+
print("DB 엔진을 사용할 수 없습니다.")
5147
return None
5248

5349
try:
54-
df = pd.read_sql(query, connection, params=params)
55-
return df
56-
except Error as e:
50+
with self.engine.connect() as connection:
51+
df = pd.read_sql(text(query), connection, params=params)
52+
return df
53+
except exc.SQLAlchemyError as e:
5754
print(f"쿼리 실행 오류: {e}")
5855
return None
59-
finally:
60-
if connection and connection.is_connected():
61-
connection.close()
62-
print("사용한 커넥션을 풀에 반환했습니다.")
56+
57+
def user_table_query(self):
58+
query = """
59+
SELECT * FROM user LIMIT 10;
60+
"""
61+
return self.execute_query(query)
6362

6463
def get_user_info_by_user_id(self, user_id):
6564
"""user_id로 사용자 정보 조회"""
66-
query = """
65+
query = f"""
6766
SELECT
6867
HEX(u.id) AS user_id,
6968
u.name AS user_name,
@@ -81,23 +80,16 @@ def get_user_info_by_user_id(self, user_id):
8180
) AS jt ON TRUE
8281
LEFT JOIN tag t ON t.id = UNHEX(REPLACE(jt.tag_id, '-', ''))
8382
WHERE
84-
u.id = UNHEX(%s)
83+
u.id = UNHEX(:user_id_hex)
8584
GROUP BY
8685
u.id, c.id;
8786
"""
88-
user_id_hex = user_id[2:] if user_id.startswith('0x') else user_id
89-
return self.execute_query(query, params=(user_id_hex,))
90-
91-
if __name__ == '__main__':
92-
# 아래 코드는 웹 프레임워크(예: FastAPI)의 시작 지점에서 한 번만 실행되어야 합니다.
93-
# export DB_HOST=... 와 같은 방식으로 환경 변수 설정 후 실행할 수 있습니다.
94-
db_service = DatabaseService()
95-
96-
if db_service.pool:
97-
# 특정 사용자 데이터 조회 예시
98-
test_user_id = '0x0034B410791D47A38ABFE03E0898A61A'
99-
user_data_df = db_service.get_user_info_by_user_id(test_user_id)
100-
101-
if user_data_df is not None:
102-
print(f"{test_user_id} 사용자의 전체 데이터를 성공적으로 가져왔습니다.")
103-
print(user_data_df.to_string())
87+
user_id_hex = user_id.replace('-', '')
88+
df = self.execute_query(query, params={'user_id_hex': user_id_hex})
89+
if df is not None:
90+
df = df.rename(columns={
91+
'user_id': 'userid',
92+
'user_name': 'name',
93+
'tag_names': 'like_list'
94+
})
95+
return df

MLOps/app/services/elk_client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import requests
22
from typing import List, Dict, Any
3+
import pandas as pd
34

45
class ELKClient:
56
"""ELK 서버 클라이언트"""
@@ -27,3 +28,21 @@ async def search_places(self, query: str, max_results: int = 23) -> List[Dict[st
2728
except Exception as e:
2829
print(f"ELK 서버 호출 실패: {e}")
2930
return []
31+
32+
def load_user_click_log(self, user_id: str, days: int = 30):
33+
"""유저 데이터 로드"""
34+
try:
35+
response = requests.get(
36+
f"{self.elk_url}/api/click-log/user/{user_id}",
37+
params={'days': days}
38+
)
39+
40+
if response.status_code == 200:
41+
df = pd.DataFrame(response.json().get("logs", []))
42+
df.columns = df.columns.str.lower()
43+
return df
44+
else:
45+
return None
46+
except Exception as e:
47+
print(f"ELK 서버 호출 실패: {e}")
48+
return None

MLOps/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,5 @@ openai==1.68.2
3838

3939
# DB Connection
4040
mysql-connector-python==8.4.0
41+
SQLAlchemy==2.0.31
4142
javaobj-py3==0.4.4

0 commit comments

Comments
 (0)