Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.example.RealMatch.chat.domain.enums.ChatSystemMessageKind;
import com.example.RealMatch.chat.presentation.dto.response.ChatSystemMessagePayload;

import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;

/**
Expand All @@ -34,14 +35,32 @@ public class SystemMessageRetrySender {

private static final Logger LOG = LoggerFactory.getLogger(SystemMessageRetrySender.class);

private static final String METRIC_LOGICAL_FAILURE = "chat.system_message.logical_failure";

private final ProcessedEventStore processedEventStore;
private final SystemMessageSender systemMessageSender;
private final MeterRegistry meterRegistry;

/**
* IN_PROGRESS 상태의 TTL. 짧게 설정하여 프로세스 다운 시 빠르게 만료 → 재처리 가능.
* 전송 + 재시도(3회, backoff 최대 800ms)를 충분히 커버하는 시간으로 설정.
*/
private static final Duration IN_PROGRESS_TTL = Duration.ofMinutes(3);

private static final Duration EVENT_IDEMPOTENCY_TTL = Duration.ofHours(6);
/**
* PROCESSED 상태의 TTL. 길게 설정하여 동일 이벤트의 중복 처리를 장기간 방지.
*/
private static final Duration PROCESSED_TTL = Duration.ofHours(6);

/**
* 시스템 메시지를 멱등성 체크 후 전송합니다.
*
* <p>2단계 상태머신:
* <ol>
* <li>IN_PROGRESS(짧은 TTL)로 선점 → 프로세스 다운 시 빠르게 만료되어 재처리 가능</li>
* <li>전송 성공 시 PROCESSED(긴 TTL)로 승격 → 중복 처리 장기간 방지</li>
* </ol>
*
* <p>반환값은 전송 성공 여부를 의미합니다.
* - true: 전송 성공 및 markAsProcessed 완료 (at-least-once 보장)
* - false: 중복 이벤트 또는 전송 실패
Expand All @@ -60,10 +79,11 @@ public boolean sendWithIdempotency(
throw new IllegalArgumentException("idempotencyKey cannot be null");
}

// 멱등성 체크: false=중복만, throw=Redis 장애 등 판단 불가 → 상위로 throw하여 DLQ 처리
boolean isNewEvent = processedEventStore.markIfNotProcessed(idempotencyKey, EVENT_IDEMPOTENCY_TTL);
// 멱등성 체크: IN_PROGRESS(짧은 TTL)로 선점
// false=중복만, throw=Redis 장애 등 판단 불가 → 상위로 throw하여 DLQ 처리
boolean isNewEvent = processedEventStore.markIfNotProcessed(idempotencyKey, IN_PROGRESS_TTL);
if (!isNewEvent) {
LOG.info("[RetrySender] Event already processed, skipping. key={}, eventType={}",
LOG.info("[RetrySender] Event already processed or in progress, skipping. key={}, eventType={}",
idempotencyKey, eventType);
return false;
}
Expand All @@ -74,13 +94,15 @@ public boolean sendWithIdempotency(
} catch (LogicalFailureException ex) {
// 논리적 실패: removeProcessed로 키 제거하고 false 반환 (DLQ 기록 안 함)
processedEventStore.removeProcessed(idempotencyKey);
recordLogicalFailure(eventType, resolveLogicalFailureReason(ex));
LOG.warn("[RetrySender] Logical failure. key={}, eventType={}, error={}",
idempotencyKey, eventType, ex.getCause() != null ? ex.getCause().getMessage() : ex.getMessage(), ex);
return false;
} catch (ChatRoomNotFoundException | IllegalArgumentException ex) {
// fallback: @Recover가 동작하지 않은 경우 (proxy 비활성화 등)
// 논리적 실패만 removeProcessed: 재시도해도 동일 결과이므로 키 제거 (레이스/중복 가능성 없음)
processedEventStore.removeProcessed(idempotencyKey);
recordLogicalFailure(eventType, ex.getClass().getSimpleName());
LOG.error("[RetrySender] Logical failure (fallback - proxy inactive suspected). " +
"key={}, eventType={}, error={}. " +
"This should be handled by @Recover. Check Spring Retry proxy configuration.",
Expand All @@ -95,10 +117,26 @@ public boolean sendWithIdempotency(
throw ex;
}

// 전송 성공 후 markAsProcessed: 실패 시 throw → 상위에서 DLQ 처리 (성공처럼 넘기지 않음)
processedEventStore.markAsProcessed(idempotencyKey, EVENT_IDEMPOTENCY_TTL);
// 전송 성공: IN_PROGRESS → PROCESSED로 승격 (긴 TTL)
// 실패 시 throw → 상위에서 DLQ 처리 (성공처럼 넘기지 않음)
processedEventStore.markAsProcessed(idempotencyKey, PROCESSED_TTL);
LOG.info("[RetrySender] System message sent successfully. key={}, roomId={}, kind={}",
idempotencyKey, roomId, messageKind);
return true;
}

private void recordLogicalFailure(String eventType, String reason) {
meterRegistry.counter(
METRIC_LOGICAL_FAILURE,
"eventType", eventType != null ? eventType : "unknown",
"reason", reason != null ? reason : "unknown"
).increment();
}

private static String resolveLogicalFailureReason(LogicalFailureException ex) {
if (ex.getCause() != null) {
return ex.getCause().getClass().getSimpleName();
}
return "LogicalFailureException";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public class FailedEventDlq {
private static final Logger LOG = LoggerFactory.getLogger(FailedEventDlq.class);
private static final String DLQ_KEY = "chat:dlq";

/**
* DLQ 최대 보관 건수.
* RPUSH 후 LTRIM으로 최근 N건만 유지하여 Redis 메모리 무한 증가를 방지합니다.
*/
private static final long MAX_DLQ_SIZE = 10_000;

private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;

Expand All @@ -43,6 +49,8 @@ public void enqueueFailedEvent(

String json = objectMapper.writeValueAsString(entry);
redisTemplate.opsForList().rightPush(DLQ_KEY, json);
// 최근 MAX_DLQ_SIZE 건만 유지하여 메모리 무한 증가 방지
redisTemplate.opsForList().trim(DLQ_KEY, -MAX_DLQ_SIZE, -1);
Comment thread
1000hyehyang marked this conversation as resolved.

LOG.info("[DLQ] Failed event enqueued. eventType={}, eventId={}, roomId={}, error={}",
eventType, eventId, roomId, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,46 @@
/**
* 이벤트 중복 처리 방지를 위한 멱등성 저장소 인터페이스.
*
* <p>동일한 이벤트 ID가 이미 처리되었는지 확인하고, 처리되지 않은 경우에만 마킹합니다.
* Redis SETNX + TTL 패턴을 사용하여 멱등성을 보장합니다.
*
* <h3>계약</h3>
* <p>2단계 상태머신으로 "처리중(IN_PROGRESS)"과 "처리완료(PROCESSED)"를 분리합니다.
* <ul>
* <li>markIfNotProcessed: true=선점 성공(처리 가능), false=중복만 의미.
* <li>markIfNotProcessed: IN_PROGRESS 상태로 선점 (짧은 TTL).
* 프로세스 다운 시 짧은 TTL로 자동 만료 → 재처리 가능 (조용한 유실 방지).
* true=선점 성공(처리 가능), false=중복(이미 처리중이거나 처리완료).
* Redis 장애/응답 null/타임아웃 등 판단 불가 시 예외 throw → 호출자가 실패로 처리(DLQ/재시도)</li>
* <li>markAsProcessed: void, 실패 시 throw (전송 성공 후 호출, 실패 시 호출자가 DLQ 처리)</li>
* <li>removeProcessed: 논리적 실패 또는 전송 최종 실패(재시도 소진) 시 호출. 최종 실패 시 키 제거해야 재처리 시 유실 없음. 중간 일시적 실패에서만 호출 금지. 실패 시 swallow(로그만)</li>
* <li>markAsProcessed: PROCESSED 상태로 승격 (긴 TTL).
* 전송 성공 후 호출하여 중복 처리를 장기간 방지.
* 실패 시 throw (호출자가 DLQ 처리)</li>
* <li>removeProcessed: 논리적 실패 또는 전송 최종 실패(재시도 소진) 시 호출.
* 최종 실패 시 키 제거해야 재처리 시 유실 없음. 중간 일시적 실패에서는 호출 금지.
* 실패 시 swallow(로그만). TTL로 자동 만료되므로 치명적이지 않음.</li>
* </ul>
*/
public interface ProcessedEventStore {

/**
* 이벤트가 이미 처리되었는지 확인하고, 처리되지 않은 경우에만 마킹(선점)합니다.
* 이벤트가 이미 처리되었는지 확인하고, 처리되지 않은 경우에만 IN_PROGRESS 상태로 마킹(선점)합니다.
*
* <p>짧은 TTL(inProgressTtl)로 설정되어, 프로세스 다운 시 빠르게 만료되어 재처리 가능합니다.
*
* @return true=선점 성공(처리 가능), false=중복(이미 처리됨)
* @throws com.example.RealMatch.chat.application.exception.IdempotencyStoreException Redis 장애, 응답 null, 타임아웃 등 판단 불가 시
* @param eventId 이벤트 고유 식별자
* @param inProgressTtl IN_PROGRESS 상태의 TTL (짧게: 1~5분 권장)
* @return true=선점 성공(처리 가능), false=중복(이미 처리중이거나 처리완료)
* @throws com.example.RealMatch.chat.application.exception.IdempotencyStoreException
* Redis 장애, 응답 null, 타임아웃 등 판단 불가 시
*/
boolean markIfNotProcessed(String eventId, Duration ttl);
boolean markIfNotProcessed(String eventId, Duration inProgressTtl);

/**
* 이벤트 처리 완료를 마킹합니다 (전송 성공 후 호출).
* 이벤트 처리 완료를 PROCESSED 상태로 마킹합니다 (전송 성공 후 호출).
*
* <p>긴 TTL(processedTtl)로 설정되어, 동일 이벤트의 중복 처리를 장기간 방지합니다.
*
* @throws com.example.RealMatch.chat.application.exception.IdempotencyStoreException Redis 장애 등 저장 실패 시
* @param eventId 이벤트 고유 식별자
* @param processedTtl PROCESSED 상태의 TTL (길게: 6시간~수일 권장)
* @throws com.example.RealMatch.chat.application.exception.IdempotencyStoreException
* Redis 장애 등 저장 실패 시
*/
void markAsProcessed(String eventId, Duration ttl);
void markAsProcessed(String eventId, Duration processedTtl);

/**
* 이벤트 처리 마킹을 삭제합니다.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
/**
* Redis 기반 이벤트 중복 처리 방지 저장소 구현체.
*
* <p>2단계 상태머신:
* <ul>
* <li>IN_PROGRESS (짧은 TTL): 선점 직후 상태. 프로세스 다운 시 빠르게 만료 → 재처리 가능.</li>
* <li>PROCESSED (긴 TTL): 전송 성공 후 상태. 동일 이벤트의 중복 처리를 장기간 방지.</li>
* </ul>
*
* <p>markIfNotProcessed: 장애/NULL → throw (false 금지). false는 "중복" 의미로만 사용.
*/
@Component
Expand All @@ -23,39 +29,47 @@ public class RedisProcessedEventStore implements ProcessedEventStore {
private static final Logger LOG = LoggerFactory.getLogger(RedisProcessedEventStore.class);
private static final String KEY_PREFIX = "chat:processed:";

/** 선점(처리 시작) 상태 값. 짧은 TTL과 함께 사용되어 프로세스 다운 시 빠르게 만료됩니다. */
private static final String VALUE_IN_PROGRESS = "IN_PROGRESS";
/** 처리 완료 상태 값. 긴 TTL과 함께 사용되어 중복 처리를 장기간 방지합니다. */
private static final String VALUE_PROCESSED = "PROCESSED";

private final StringRedisTemplate redisTemplate;

@Override
public boolean markIfNotProcessed(String eventId, Duration ttl) {
public boolean markIfNotProcessed(String eventId, Duration inProgressTtl) {
if (eventId == null) {
throw new IllegalArgumentException("eventId cannot be null");
}
if (ttl == null || ttl.isNegative() || ttl.isZero()) {
throw new IllegalArgumentException("ttl must be positive");
if (inProgressTtl == null || inProgressTtl.isNegative() || inProgressTtl.isZero()) {
throw new IllegalArgumentException("inProgressTtl must be positive");
}

String key = KEY_PREFIX + eventId;
long ttlSeconds = ttl.getSeconds();
long ttlSeconds = inProgressTtl.getSeconds();

try {
// SET key value NX EX ttlSeconds
// NX: 키가 존재하지 않을 때만 설정
// EX: TTL 설정 (초 단위)
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "1", ttl);
// SET key IN_PROGRESS NX EX ttlSeconds
// NX: 키가 존재하지 않을 때만 설정 (선점)
// EX: 짧은 TTL → 프로세스 다운 시 빠르게 만료되어 재처리 가능
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, VALUE_IN_PROGRESS, inProgressTtl);

if (result == null) {
// Redis 응답 null = 판단 불가 → 반드시 실패로 올림 (DLQ/재시도)
String msg = String.format("Redis returned null for SETNX. eventId=%s, key=%s", eventId, key);
String msg = String.format(
"Redis returned null for SETNX. eventId=%s, key=%s", eventId, key);
LOG.error("[Idempotency] {}", msg);
throw new IdempotencyStoreException(msg);
}

if (result) {
LOG.debug("[Idempotency] Event marked as processing. eventId={}, ttl={}s", eventId, ttlSeconds);
LOG.debug("[Idempotency] Event marked as IN_PROGRESS. eventId={}, ttl={}s",
eventId, ttlSeconds);
return true; // 선점 성공 (처리 가능)
} else {
LOG.debug("[Idempotency] Duplicate event detected. eventId={}", eventId);
return false; // 중복만 의미
return false; // 중복만 의미 (IN_PROGRESS 또는 PROCESSED 상태)
}

} catch (IdempotencyStoreException e) {
Expand All @@ -70,23 +84,25 @@ public boolean markIfNotProcessed(String eventId, Duration ttl) {
}

@Override
public void markAsProcessed(String eventId, Duration ttl) {
public void markAsProcessed(String eventId, Duration processedTtl) {
if (eventId == null) {
throw new IllegalArgumentException("eventId cannot be null");
}
if (ttl == null || ttl.isNegative() || ttl.isZero()) {
throw new IllegalArgumentException("ttl must be positive");
if (processedTtl == null || processedTtl.isNegative() || processedTtl.isZero()) {
throw new IllegalArgumentException("processedTtl must be positive");
}

String key = KEY_PREFIX + eventId;
long ttlSeconds = ttl.getSeconds();
long ttlSeconds = processedTtl.getSeconds();

try {
// 전송 성공 후 키를 확실히 남기기 위해 SET EX 사용
redisTemplate.opsForValue().set(key, "1", ttl);
LOG.debug("[Idempotency] Event marked as processed (success). eventId={}, ttl={}s", eventId, ttlSeconds);
// IN_PROGRESS → PROCESSED로 승격. 긴 TTL로 중복 처리를 장기간 방지.
redisTemplate.opsForValue().set(key, VALUE_PROCESSED, processedTtl);
LOG.debug("[Idempotency] Event promoted to PROCESSED. eventId={}, ttl={}s",
eventId, ttlSeconds);
} catch (Exception ex) {
String msg = String.format("Failed to mark event as processed. eventId=%s, key=%s, ttl=%ds",
String msg = String.format(
"Failed to mark event as PROCESSED. eventId=%s, key=%s, ttl=%ds",
eventId, key, ttlSeconds);
LOG.error("[Idempotency] {}", msg, ex);
throw new IdempotencyStoreException(msg, ex);
Expand All @@ -105,7 +121,8 @@ public void removeProcessed(String eventId) {
redisTemplate.delete(key);
LOG.debug("[Idempotency] Removed processed event key. eventId={}", eventId);
} catch (Exception ex) {
LOG.warn("[Idempotency] Failed to remove processed event key. eventId={}, key={}", eventId, key, ex);
LOG.warn("[Idempotency] Failed to remove processed event key. eventId={}, key={}",
eventId, key, ex);
// 삭제 실패해도 TTL로 자동 만료되므로 치명적이지 않음
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public interface ChatRoomRepository extends JpaRepository<ChatRoom, Long>, ChatR
r.lastMessagePreview = :messagePreview,
r.lastMessageType = :messageType
WHERE r.id = :roomId
AND (r.lastMessageAt IS NULL OR r.lastMessageAt < :messageAt)
AND (r.lastMessageAt IS NULL
OR r.lastMessageAt < :messageAt
OR (r.lastMessageAt = :messageAt AND r.lastMessageId < :messageId))
""")
int updateLastMessageIfNewer(
@Param("roomId") Long roomId,
Expand Down
Loading