Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions backend/app/common/named_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
MySQL Named Lock 유틸리티

여러 워커에서 동시 실행을 방지하기 위한 분산 락을 제공합니다.
MySQL의 GET_LOCK/RELEASE_LOCK 함수를 사용합니다.
"""

from contextlib import asynccontextmanager
from logging import Logger
from typing import AsyncGenerator

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

from app.common.model.base import get_session_maker

logger = Logger(__name__)


@asynccontextmanager
async def named_lock(
lock_name: str,
timeout: int = 0,
) -> AsyncGenerator[bool, None]:
"""
MySQL Named Lock 컨텍스트 매니저

@param lock_name: 락 이름 (고유 식별자)
@param timeout: 락 획득 대기 시간 (초, 기본: 0 - 즉시 반환)
@return: 락 획득 성공 여부

사용 예시:
async with named_lock("trade_execution") as acquired:
if acquired:
# 락 획득 성공 - 작업 실행
await do_work()
else:
# 락 획득 실패 - 다른 워커가 실행 중
pass
"""
session_maker = get_session_maker()
acquired = False

# Named Lock은 세션 레벨이므로 별도 세션 사용
async with session_maker() as session:
try:
# GET_LOCK(name, timeout) - 락 획득 시도
# 반환값: 1 (성공), 0 (타임아웃), NULL (에러)
result = await session.execute(
text("SELECT GET_LOCK(:lock_name, :timeout)"),
{"lock_name": lock_name, "timeout": timeout},
)
lock_result = result.scalar()
acquired = lock_result == 1

if acquired:
logger.info(f"락 획득 성공: {lock_name}")
else:
logger.info(f"락 획득 실패: {lock_name} (다른 워커가 실행 중)")

yield acquired

finally:
# 락을 획득한 경우에만 해제
if acquired:
await session.execute(
text("SELECT RELEASE_LOCK(:lock_name)"),
{"lock_name": lock_name},
)
logger.info(f"락 해제: {lock_name}")
24 changes: 15 additions & 9 deletions backend/app/configs/scheduling_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from logging import Logger

from app.common.model.base import get_session_maker
from app.common.named_lock import named_lock
from app.trade.service.trade_service import TradeService

logger = Logger(__name__)
Expand All @@ -12,17 +13,22 @@ async def trade_execution_job() -> None:
주기적으로 실행되는 자동 거래 작업

모든 활성화된 코인에 대해 AI 분석을 수행하고 거래를 실행합니다.
MySQL Named Lock을 사용하여 여러 워커에서 동시 실행을 방지합니다.
"""

session_maker = get_session_maker()
# Named Lock 획득 시도 (즉시 반환, 대기 없음)
async with named_lock("trade_execution", timeout=0) as acquired:
if not acquired:
logger.info("🤩 다른 워커가 거래 작업을 실행 중입니다. 스킵합니다.")
return

try:
async with session_maker() as session:
trade_service = TradeService(session=session)
logger.info("🚀 자동 거래 작업 시작")
await trade_service.execute()
session_maker = get_session_maker()

except Exception as e:
logger.info(f"❌ 거래 실행 중 오류 발생: {str(e)}\n{traceback.format_exc()}")
try:
async with session_maker() as session:
trade_service = TradeService(session=session)
logger.info("🚀 자동 거래 작업 시작")
await trade_service.execute()

traceback.print_exc()
except Exception as e:
logger.error(f"거래 실행 중 오류 발생: {str(e)}\n{traceback.format_exc()}")