diff --git a/.gitignore b/.gitignore index 40bacdf4..701f059b 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ src/main/resources/application.properties docker-compose.dev.yml /data/ /db/ +/replicaset # 환경변수 .env diff --git a/build.gradle b/build.gradle index 8612ce11..f0c7c626 100644 --- a/build.gradle +++ b/build.gradle @@ -55,7 +55,7 @@ dependencies { // DB runtimeOnly 'com.h2database:h2' - runtimeOnly 'com.mysql:mysql-connector-j' + implementation 'com.mysql:mysql-connector-j' annotationProcessor 'org.projectlombok:lombok' @@ -120,6 +120,10 @@ dependencies { //AES 암호화 implementation 'javax.xml.bind:jaxb-api:2.3.1' + + // Circuit Breaker + implementation 'io.github.resilience4j:resilience4j-spring-boot3:2.2.0' + implementation 'io.github.resilience4j:resilience4j-circuitbreaker:2.2.0' } tasks.named('test') { diff --git a/restart-mongodb-replica.sh b/restart-mongodb-replica.sh new file mode 100644 index 00000000..7979170e --- /dev/null +++ b/restart-mongodb-replica.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +DATA_FILE_PATH="./replicaset" +DOCKER_FILE_PATH="./docker-compose.dev.yml" +MONGO_PRIMARY_NAME="rs01p" +REPLICA_INIT_FILE_PATH="./scripts/rs-init.sh" +MONGO_CREATE_USER_FILE_PATH="./scripts/mongo-create-user.sh" + +UP_CONTAINER_DELAY=10 +REPLICA_CONFIG_DELAY=25 + +echo "****************** Reset docker container Shell Script ******************" +echo "Data File Path: ${DATA_FILE_PATH}" +echo "Docker File Path: ${DOCKER_FILE_PATH}" +echo "MongoDB Primary name: ${MONGO_PRIMARY_NAME}" +echo "Replica set init Script File Path: ${REPLICA_INIT_FILE_PATH}" +echo "Mongo create user file path: ${MONGO_CREATE_USER_FILE_PATH}" + +sleep 1; + +echo "****************** Stop docker container ******************" +docker-compose -f ${DOCKER_FILE_PATH} stop +echo "****************** Completed Stop docker container ******************" + +sleep 1; + +echo "****************** Down docker container ******************" +docker-compose -f ${DOCKER_FILE_PATH} down +echo "****************** Completed Down docker container ******************" + +sleep 1; + +echo "****************** Remove Data ******************" +rm -rf ${DATA_FILE_PATH} +echo "****************** Completed Remove Data ******************" + +sleep 1; + +echo "****************** Up docker container ******************" +docker-compose -f ${DOCKER_FILE_PATH} up -d +echo "****************** Completed Up docker container ******************" + +echo "****** Waiting for ${UP_CONTAINER_DELAY} seconds ******" +sleep $UP_CONTAINER_DELAY; + +echo "****************** Run Replica Set Shell Script ******************" +docker exec -i ${MONGO_PRIMARY_NAME} bash < ${REPLICA_INIT_FILE_PATH} + +echo "****** Waiting for ${REPLICA_CONFIG_DELAY} seconds for replicaset configuration to be applied ******" +sleep $REPLICA_CONFIG_DELAY + +echo "****************** Run Create DB User Shell Script ******************" +docker exec -i ${MONGO_PRIMARY_NAME} bash < "${MONGO_CREATE_USER_FILE_PATH}" + +echo "****************** Completed Replica Shell Script ******************" diff --git a/scripts/mongo-create-user.sh b/scripts/mongo-create-user.sh new file mode 100644 index 00000000..0bd73128 --- /dev/null +++ b/scripts/mongo-create-user.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +mongosh --port 10021 <= maxRetries) { + this.status = ProcessStatus.PERMANENTLY_FAILED; + } else { + this.status = ProcessStatus.FAILED; + } + } + + public void updateLastAttempt() { + this.lastAttemptAt = LocalDateTime.now(); + } +} diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/model/NotificationQueueConstants.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/model/NotificationQueueConstants.java index c959a684..e93db4a5 100644 --- a/src/main/java/org/ezcode/codetest/infrastructure/notification/model/NotificationQueueConstants.java +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/model/NotificationQueueConstants.java @@ -4,6 +4,8 @@ public final class NotificationQueueConstants { private NotificationQueueConstants() {} + public static final String CUSTOM_HEADER_MESSAGE_ID = "custom_message_id"; + public static final String NOTIFICATION_QUEUE_CREATE = "notification.queue.create"; public static final String NOTIFICATION_QUEUE_LIST = "notification.queue.list"; public static final String NOTIFICATION_QUEUE_MARK_READ = "notification.queue.read"; diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/publisher/NotificationEventPublisher.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/publisher/NotificationEventPublisher.java index bba3aa70..53605a34 100644 --- a/src/main/java/org/ezcode/codetest/infrastructure/notification/publisher/NotificationEventPublisher.java +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/publisher/NotificationEventPublisher.java @@ -2,6 +2,8 @@ import static org.ezcode.codetest.infrastructure.notification.model.NotificationQueueConstants.*; +import java.util.UUID; + import org.ezcode.codetest.application.notification.event.*; import org.ezcode.codetest.application.notification.exception.NotificationException; import org.ezcode.codetest.application.notification.exception.NotificationExceptionCode; @@ -27,21 +29,18 @@ public class NotificationEventPublisher implements NotificationEventService { public void notify(NotificationCreateEvent dto) { sendMessage(NOTIFICATION_QUEUE_CREATE, dto); - // publisher.publishEvent(dto); } @Override public void notifyList(NotificationListRequestEvent dto) { sendMessage(NOTIFICATION_QUEUE_LIST, dto); - // publisher.publishEvent(dto); } @Override public void setRead(NotificationMarkReadEvent dto) { sendMessage(NOTIFICATION_QUEUE_MARK_READ, dto); - // publisher.publishEvent(dto); } private void sendMessage(String destination, Object data) { @@ -49,7 +48,12 @@ private void sendMessage(String destination, Object data) { try { String jsonMessage = objectMapper.writeValueAsString(data); - jmsTemplate.convertAndSend(destination, jsonMessage); + String customMessageId = "ID-" + UUID.randomUUID(); + jmsTemplate.convertAndSend(destination, jsonMessage, message -> { + message.setStringProperty(CUSTOM_HEADER_MESSAGE_ID, customMessageId); + return message; + }); + log.info("알림 메시지 전송 성공 ({}) : {}", destination, jsonMessage); } catch (JsonProcessingException ex) { log.error("알림 메시지 변환 및 전송 실패 : {}", ex.getMessage()); diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/repository/NotificationProcessLogRepository.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/repository/NotificationProcessLogRepository.java new file mode 100644 index 00000000..51e2766c --- /dev/null +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/repository/NotificationProcessLogRepository.java @@ -0,0 +1,12 @@ +package org.ezcode.codetest.infrastructure.notification.repository; + +import java.util.List; + +import org.ezcode.codetest.infrastructure.notification.model.NotificationProcessLog; +import org.springframework.data.mongodb.repository.MongoRepository; + +public interface NotificationProcessLogRepository extends MongoRepository { + + // 재시도할 작업들을 찾는 쿼리 메서드 + List findByStatusAndRetryCountLessThan(NotificationProcessLog.ProcessStatus status, int maxRetries); +} diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationEventListener.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationEventListener.java index 1c62db01..87915e16 100644 --- a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationEventListener.java +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationEventListener.java @@ -1,80 +1,25 @@ package org.ezcode.codetest.infrastructure.notification.service; -import static org.ezcode.codetest.infrastructure.notification.model.NotificationQueueConstants.*; - -import org.ezcode.codetest.application.notification.event.NotificationCreateEvent; -import org.ezcode.codetest.application.notification.event.NotificationListRequestEvent; -import org.ezcode.codetest.application.notification.event.NotificationMarkReadEvent; -import org.ezcode.codetest.application.notification.exception.NotificationException; -import org.ezcode.codetest.application.notification.exception.NotificationExceptionCode; -import org.ezcode.codetest.infrastructure.notification.dto.NotificationPageResponse; -import org.ezcode.codetest.infrastructure.notification.model.NotificationDocument; -import org.ezcode.codetest.infrastructure.notification.dto.NotificationResponse; -import org.springframework.jms.annotation.JmsListener; +import org.ezcode.codetest.infrastructure.notification.event.NotificationSavedEvent; import org.springframework.messaging.simp.SimpMessagingTemplate; -import org.springframework.stereotype.Service; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -@Slf4j -@Service +@Component @RequiredArgsConstructor public class NotificationEventListener { - private final NotificationService notificationService; - private final SimpMessagingTemplate messagingTemplate; - private final ObjectMapper objectMapper; - - @JmsListener(destination = NOTIFICATION_QUEUE_CREATE) - public void handleNotificationCreateEvent(String message) { - - NotificationCreateEvent event = convertObject(message, NotificationCreateEvent.class); - NotificationDocument notification = notificationService.createNewNotification(event); - + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + public void handleNotificationSavedEvent(NotificationSavedEvent event) { messagingTemplate.convertAndSendToUser( event.principalName(), "/queue/notification", - NotificationResponse.from(notification) + event.response() ); } - - @JmsListener(destination = NOTIFICATION_QUEUE_LIST) - public void handleNotificationListRequestEvent(String message) { - - NotificationListRequestEvent event = convertObject(message, NotificationListRequestEvent.class); - NotificationPageResponse notifications = notificationService.getNotifications(event); - - messagingTemplate.convertAndSendToUser( - event.principalName(), - "/queue/notifications", - notifications - ); - } - - @JmsListener(destination = NOTIFICATION_QUEUE_MARK_READ) - public void handleNotificationReadEvent(String message) { - - NotificationMarkReadEvent event = convertObject(message, NotificationMarkReadEvent.class); - notificationService.markAsRead(event); - } - - private T convertObject(String message, Class targetClass) { - - try { - T dto = objectMapper.readValue(message, targetClass); - log.info("알림 객체 {} 변환 성공", targetClass.getSimpleName()); - return dto; - } catch (JsonProcessingException ex) { - log.error("알림 객체 {} 변환 실패", targetClass.getSimpleName(), ex); - throw new NotificationException( - NotificationExceptionCode.NOTIFICATION_CONVERT_MESSAGE_ERROR - ); - } - } } diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationQueueConsumer.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationQueueConsumer.java new file mode 100644 index 00000000..4dbb4019 --- /dev/null +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationQueueConsumer.java @@ -0,0 +1,95 @@ +package org.ezcode.codetest.infrastructure.notification.service; + +import static org.ezcode.codetest.infrastructure.notification.model.NotificationQueueConstants.*; + +import org.ezcode.codetest.application.notification.event.NotificationCreateEvent; +import org.ezcode.codetest.application.notification.event.NotificationListRequestEvent; +import org.ezcode.codetest.application.notification.event.NotificationMarkReadEvent; +import org.ezcode.codetest.application.notification.exception.NotificationException; +import org.ezcode.codetest.application.notification.exception.NotificationExceptionCode; +import org.ezcode.codetest.infrastructure.notification.dto.NotificationPageResponse; +import org.ezcode.codetest.infrastructure.notification.dto.NotificationResponse; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class NotificationQueueConsumer { + + private final NotificationService notificationService; + + private final SimpMessagingTemplate messagingTemplate; + + private final ObjectMapper objectMapper; + private final ProcessLogService processLogService; + + @JmsListener(destination = NOTIFICATION_QUEUE_CREATE) + public void handleNotificationCreateEvent(Message message) { + + String messageId = (String) message.getHeaders().get(CUSTOM_HEADER_MESSAGE_ID); + String payload = message.getPayload(); + + if (messageId == null) { + log.error("커스텀 메시지 ID 헤더가 없어 메시지를 처리할 수 없습니다. payload={}", payload); + return; + } + + if (!processLogService.startProcessing(messageId, payload)) { + log.warn("이미 처리되었거나 처리 중인 메시지입니다. messageId: {}", messageId); + return; + } + + try { + NotificationCreateEvent event = convertObject(payload, NotificationCreateEvent.class); + notificationService.createNewNotification(event); + + processLogService.finishProcessing(messageId); + } catch (Exception e) { + log.error("메시지 처리 실패. 재시도를 위해 FAILED로 기록합니다. messageId: {}", messageId); + processLogService.failProcessing(messageId, e.getMessage()); + } + } + + @JmsListener(destination = NOTIFICATION_QUEUE_LIST) + public void handleNotificationListRequestEvent(String message) { + + NotificationListRequestEvent event = convertObject(message, NotificationListRequestEvent.class); + NotificationPageResponse notifications = notificationService.getNotifications(event); + + messagingTemplate.convertAndSendToUser( + event.principalName(), + "/queue/notifications", + notifications + ); + } + + @JmsListener(destination = NOTIFICATION_QUEUE_MARK_READ) + public void handleNotificationReadEvent(String message) { + + NotificationMarkReadEvent event = convertObject(message, NotificationMarkReadEvent.class); + notificationService.markAsRead(event); + } + + private T convertObject(String message, Class targetClass) { + + try { + T dto = objectMapper.readValue(message, targetClass); + log.info("알림 객체 {} 변환 성공", targetClass.getSimpleName()); + return dto; + } catch (JsonProcessingException ex) { + log.error("알림 객체 {} 변환 실패", targetClass.getSimpleName(), ex); + throw new NotificationException( + NotificationExceptionCode.NOTIFICATION_CONVERT_MESSAGE_ERROR + ); + } + } +} diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationRetryScheduler.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationRetryScheduler.java new file mode 100644 index 00000000..9cd3cdd0 --- /dev/null +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationRetryScheduler.java @@ -0,0 +1,38 @@ +package org.ezcode.codetest.infrastructure.notification.service; + +import static org.ezcode.codetest.infrastructure.notification.model.NotificationQueueConstants.*; + +import java.util.List; + +import org.ezcode.codetest.infrastructure.notification.model.NotificationProcessLog; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class NotificationRetryScheduler { + + private final ProcessLogService processLogService; + private final JmsTemplate jmsTemplate; + + @Scheduled(fixedRate = 300000) // 5분마다 실행 + public void retryFailedNotifications() { + + log.info("실패할 알림 재처리를 시작합니다..."); + List retryableJobs = processLogService.findRetryableJobs(); + + for (NotificationProcessLog job : retryableJobs) { + log.info("재처리 시도: messageId={}", job.getMessageId()); + + jmsTemplate.convertAndSend(NOTIFICATION_QUEUE_CREATE, job.getPayload(), message -> { + message.setStringProperty(CUSTOM_HEADER_MESSAGE_ID, job.getMessageId()); + return message; + }); + } + } +} diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationService.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationService.java index 4ac21bdb..3974e05f 100644 --- a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationService.java +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/NotificationService.java @@ -9,22 +9,31 @@ import org.ezcode.codetest.application.notification.exception.NotificationExceptionCode; import org.ezcode.codetest.infrastructure.notification.dto.NotificationPageResponse; import org.ezcode.codetest.infrastructure.notification.dto.NotificationResponse; +import org.ezcode.codetest.infrastructure.notification.event.NotificationSavedEvent; import org.ezcode.codetest.infrastructure.notification.model.NotificationDocument; import org.ezcode.codetest.infrastructure.notification.repository.NotificationMongoRepository; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import lombok.extern.slf4j.Slf4j; @Service +@Slf4j public class NotificationService { private final NotificationMongoRepository mongoRepository; + private final ApplicationEventPublisher publisher; public NotificationService( - NotificationMongoRepository mongoRepository + NotificationMongoRepository mongoRepository, ApplicationEventPublisher publisher ) { this.mongoRepository = mongoRepository; + this.publisher = publisher; } public NotificationPageResponse getNotifications(NotificationListRequestEvent event) { @@ -45,11 +54,19 @@ public NotificationPageResponse getNotifications(Notificat ); } - public NotificationDocument createNewNotification(NotificationCreateEvent event) { + @CircuitBreaker(name = "db-circuit", fallbackMethod = "createNewNotificationFallback") + @Transactional(transactionManager = "mongoTransactionManager") + public void createNewNotification(NotificationCreateEvent event) { + + NotificationDocument savedNotification = mongoRepository.save(NotificationDocument.from(event)); - return mongoRepository.save(NotificationDocument.from(event)); + publisher.publishEvent(new NotificationSavedEvent( + event.principalName(), + NotificationResponse.from(savedNotification) + )); } + @Transactional(transactionManager = "mongoTransactionManager") public void markAsRead(NotificationMarkReadEvent event) { NotificationDocument notificationDocument = mongoRepository @@ -59,4 +76,22 @@ public void markAsRead(NotificationMarkReadEvent event) { notificationDocument.markAsRead(); mongoRepository.save(notificationDocument); } + + /** + * createNewNotification의 Fallback 메서드 + * 서킷이 열리면 이 메서드가 대신 실행됨 + * @param event 원본 메서드와 동일한 파라미터 + * @param ex 발생한 예외 + */ + private void createNewNotificationFallback(NotificationCreateEvent event, Throwable ex) { + // 어떤 예외 때문에 서킷이 열렸는지 로그를 남김 + log.warn("Circuit Breaker is open for createNewNotification. Event: {}, Error: {}", event, ex.getMessage()); + + // 메시지를 "처리 실패"로 알려서 MQ가 재시도하거나 DLQ로 보내도록 함 + throw new NotificationException( + NotificationExceptionCode.NOTIFICATION_DB_ERROR, + ex, + ex.getMessage() + ); + } } diff --git a/src/main/java/org/ezcode/codetest/infrastructure/notification/service/ProcessLogService.java b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/ProcessLogService.java new file mode 100644 index 00000000..491e66f4 --- /dev/null +++ b/src/main/java/org/ezcode/codetest/infrastructure/notification/service/ProcessLogService.java @@ -0,0 +1,93 @@ +package org.ezcode.codetest.infrastructure.notification.service; + +import java.util.List; +import java.util.Optional; + +import org.ezcode.codetest.infrastructure.notification.model.NotificationProcessLog; +import org.ezcode.codetest.infrastructure.notification.model.NotificationProcessLog.ProcessStatus; +import org.ezcode.codetest.infrastructure.notification.repository.NotificationProcessLogRepository; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Service +@Slf4j +@RequiredArgsConstructor +public class ProcessLogService { + + private final NotificationProcessLogRepository processLogRepository; + + @Value("${application.notification.max-retries:5}") + private int maxRetries; + + /** + * 메시지 처리 시작 및 중복 저장 검사 + * @param messageId 메시지 고유 ID + * @param payload 메시지 본문 + * @return 처리를 계속해야 하면 true, 중복 메시지이면 false를 반환 + */ + @Transactional(transactionManager = "mongoTransactionManager") + public boolean startProcessing(String messageId, String payload) { + + Optional existingLogOpt = processLogRepository.findById(messageId); + + if (existingLogOpt.isPresent()) { + NotificationProcessLog existingLog = existingLogOpt.get(); + + // 1. 이미 성공한 경우 -> 중복이므로 처리 중단 + if (existingLog.getStatus() == ProcessStatus.SUCCESS) { + log.warn("이미 성공적으로 처리된 메시지입니다. messageId={}", messageId); + return false; + } + + // 2. 실패했거나, 중간에 멈춘(PENDING) 경우 -> 재처리를 위해 계속 진행 + // 이때는 새 로그를 만드는 대신, 마지막 시도 시간만 업데이트 + log.info("실패했거나 PENDING 상태인 메시지를 재시도합니다. messageId={}", messageId); + existingLog.updateLastAttempt(); + processLogRepository.save(existingLog); + return true; + } + + NotificationProcessLog newLog = NotificationProcessLog.of(messageId, payload); + processLogRepository.save(newLog); + return true; + } + + // 메시지 처리 성공으로 기록 + @Transactional(transactionManager = "mongoTransactionManager") + public void finishProcessing(String messageId) { + + NotificationProcessLog log = processLogRepository.findById(messageId) + .orElseThrow(() -> { + ProcessLogService.log.error("처리 완료할 로그를 찾을 수 없습니다. messageId={}", messageId); + return new IllegalStateException("Process log not found: " + messageId); + }); + + log.markAsSuccess(); + processLogRepository.save(log); + + } + + // 메시지 처리 실패로 기록 + @Transactional(transactionManager = "mongoTransactionManager") + public void failProcessing(String messageId, String errorMessage) { + + NotificationProcessLog log = processLogRepository.findById(messageId) + .orElseThrow(() -> { + ProcessLogService.log.error("실패 기록할 로그를 찾을 수 없습니다. messageId={}", messageId); + return new IllegalStateException("Process log not found: " + messageId); + }); + log.markAsFailed(errorMessage, maxRetries); + processLogRepository.save(log); + } + + // 재시도할 작업 목록 조회 + @Transactional(readOnly = true, transactionManager = "mongoTransactionManager") + public List findRetryableJobs() { + + return processLogRepository.findByStatusAndRetryCountLessThan(ProcessStatus.FAILED, maxRetries); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 06db0400..b39c0146 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -89,8 +89,6 @@ spring.security.oauth2.client.registration.google.scope=profile,email # ======================== spring.data.mongodb.uri=${SPRING_MONGODB_URI} spring.data.mongodb.auto-index-creation=true -logging.level.org.mongodb.driver.protocol.command=DEBUG -logging.level.org.springframework.data.mongodb.core.MongoTemplate=DEBUG # ======================== # Spring Cache Caffeine @@ -162,6 +160,17 @@ spring.datasource.hikari.connection-timeout=30000 spring.datasource.hikari.validation-timeout=5000 # ======================== +# Circuit Breaker +# ======================== +resilience4j.circuitbreaker.instances.db-circuit.sliding-window-size=10 +resilience4j.circuitbreaker.instances.db-circuit.minimum-number-of-calls=5 +resilience4j.circuitbreaker.instances.db-circuit.failure-rate-threshold=50 +resilience4j.circuitbreaker.instances.db-circuit.wait-duration-in-open-state=10s +resilience4j.circuitbreaker.instances.db-circuit.slow-call-duration-threshold=2s +resilience4j.circuitbreaker.instances.db-circuit.slow-call-rate-threshold=60 +resilience4j.circuitbreaker.instances.db-circuit.register-health-indicator=true +resilience4j.circuitbreaker.instances.db-circuit.record-exceptions=org.springframework.dao.DataAccessResourceFailureException,org.springframework.dao.DataAccessException,com.mongodb.MongoTimeoutException,java.net.ConnectException,java.util.concurrent.TimeoutException + # App Redirects (Email Verify) # ======================== # 인증 성공 시 이동할 페이지 URL (예: 프론트의 성공 안내 페이지) diff --git a/src/test/java/org/ezcode/codetest/infrastructure/notification/MongoTransactionTest.java b/src/test/java/org/ezcode/codetest/infrastructure/notification/MongoTransactionTest.java new file mode 100644 index 00000000..cd8694c4 --- /dev/null +++ b/src/test/java/org/ezcode/codetest/infrastructure/notification/MongoTransactionTest.java @@ -0,0 +1,33 @@ +package org.ezcode.codetest.infrastructure.notification; + +import org.ezcode.codetest.application.notification.enums.NotificationType; +import org.ezcode.codetest.application.notification.event.NotificationCreateEvent; +import org.ezcode.codetest.infrastructure.notification.model.NotificationDocument; +import org.ezcode.codetest.infrastructure.notification.repository.NotificationMongoRepository; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.transaction.annotation.Transactional; + +@SpringBootTest +@Disabled +@ActiveProfiles("test") +public class MongoTransactionTest { + + @Autowired + private NotificationMongoRepository mongoRepository; + + @Test + @Transactional + void transactionTest() { + mongoRepository.save(NotificationDocument.from( + NotificationCreateEvent.of( + "test@test.com", + NotificationType.COMMUNITY_DISCUSSION_VOTED_UP, + null + ) + )); + } +} diff --git a/src/test/java/org/ezcode/codetest/infrastructure/notification/NotificationCircuitBreakTest.java b/src/test/java/org/ezcode/codetest/infrastructure/notification/NotificationCircuitBreakTest.java new file mode 100644 index 00000000..ad232dc5 --- /dev/null +++ b/src/test/java/org/ezcode/codetest/infrastructure/notification/NotificationCircuitBreakTest.java @@ -0,0 +1,168 @@ +package org.ezcode.codetest.infrastructure.notification; + +import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.time.Duration; + +import org.ezcode.codetest.application.notification.enums.NotificationType; +import org.ezcode.codetest.application.notification.event.NotificationCreateEvent; +import org.ezcode.codetest.application.notification.exception.NotificationException; +import org.ezcode.codetest.infrastructure.notification.model.NotificationDocument; +import org.ezcode.codetest.infrastructure.notification.repository.NotificationMongoRepository; +import org.ezcode.codetest.infrastructure.notification.service.NotificationService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; + +@Disabled +@SpringBootTest(properties = { + // 테스트 실행 속도를 위해 서킷 대기 시간을 짧게 조정 + "resilience4j.circuitbreaker.instances.db-circuit.wait-duration-in-open-state=2s" +}) +@ActiveProfiles("test") +public class NotificationCircuitBreakTest { + + @Autowired + private NotificationService notificationService; + + @Autowired + private CircuitBreakerRegistry circuitBreakerRegistry; + + @MockitoBean + private NotificationMongoRepository mongoRepository; + + private CircuitBreaker dbCircuitBreaker; + + @BeforeEach + void setUp() { + // 테스트 시작 전에 서킷 브레이커를 초기화 (CLOSED 상태로 강제) + dbCircuitBreaker = circuitBreakerRegistry.circuitBreaker("db-circuit"); + dbCircuitBreaker.reset(); + } + + @AfterEach + void tearDown() { + mongoRepository.deleteAll(); + } + + @Test + @DisplayName("1. 정상 상황: DB가 안정적일 때 서킷은 CLOSED 상태를 유지한다") + void whenDbIsStable_thenCircuitRemainsClosed() { + // Given + NotificationCreateEvent event = createNotificationCreateEvent(); + NotificationDocument dummyDocument = NotificationDocument.from(event); + + when(mongoRepository.save(any(NotificationDocument.class))).thenReturn(dummyDocument); + + // When & Then + for (int i = 0; i < 10; i++) { + assertDoesNotThrow(() -> notificationService.createNewNotification(createNotificationCreateEvent())); + } + assertThat(dbCircuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); + System.out.println("✅ 성공: 정상 상황 테스트 완료"); + } + + @Test + @DisplayName("2. 장애 발생: DB 장애 반복 시 서킷이 OPEN 상태로 변경된다") + void whenDbFailureRepeats_thenCircuitOpens() { + // Given + String errorMessage = "DB Connection Failed"; + when(mongoRepository.save(any(NotificationDocument.class))) + .thenThrow(new DataAccessResourceFailureException(errorMessage)); + + // When & Then + for (int i = 0; i < 5; i++) { + assertThatThrownBy(() -> notificationService.createNewNotification(createNotificationCreateEvent())) + .isInstanceOf(NotificationException.class) + .hasCauseInstanceOf(DataAccessResourceFailureException.class); + } + + assertThat(dbCircuitBreaker.getState()).isEqualTo(CircuitBreaker.State.OPEN); + System.out.println("✅ 성공: 장애 발생 시 OPEN 전환 테스트 완료"); + } + + @Test + @DisplayName("3. 자동 복구: 서킷이 열린 후, 시간이 지나면 HALF_OPEN을 거쳐 CLOSED로 복구된다") + void whenCircuitIsOpen_andAfterWaitDuration_thenTransitionsToHalfOpenAndClosed() throws InterruptedException { + // Given + NotificationCreateEvent event = createNotificationCreateEvent(); + NotificationDocument dummyDocument = NotificationDocument.from(event); + + dbCircuitBreaker.transitionToOpenState(); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + when(mongoRepository.save(any(NotificationDocument.class))).thenReturn(dummyDocument); + + // When + for (int i = 0; i < 5; i++) { + notificationService.createNewNotification(createNotificationCreateEvent()); + } + + // Then + assertThat(dbCircuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); + System.out.println("✅ 성공: 자동 복구 테스트 완료"); + } + + @Test + @DisplayName("4. 복구 실패: HALF_OPEN 상태에서 호출이 실패하면 다시 OPEN 상태로 돌아간다") + void whenCircuitIsHalfOpen_andCallFails_thenTransitionsBackToOpen() throws InterruptedException { + // Given + String errorMessage = "DB Still Down"; + int minimumCalls = 5; + + // 1. 서킷을 OPEN 상태로 만들고, HALF_OPEN이 될 때까지 대기 + dbCircuitBreaker.transitionToOpenState(); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + + // 2. HALF_OPEN 상태에서 DB가 여전히 장애 상황임을 흉내 + when(mongoRepository.save(any(NotificationDocument.class))) + .thenThrow(new DataAccessResourceFailureException(errorMessage)); + + // When: 최소 호출 횟수만큼 실패를 유도 + for (int i = 0; i < minimumCalls; i++) { + assertThatThrownBy(() -> notificationService.createNewNotification(createNotificationCreateEvent())) + .isInstanceOf(NotificationException.class); + } + + // Then: 이제 충분한 실패 데이터가 쌓였으므로 서킷은 다시 OPEN 상태로 돌아가야 함 + assertThat(dbCircuitBreaker.getState()).isEqualTo(CircuitBreaker.State.OPEN); + System.out.println("✅ 성공: 복구 실패 테스트 완료"); + } + + @Test + @DisplayName("5. 서킷이 OPEN일 때 Fallback 동작 검증") + void whenCircuitIsOpen_thenFallbackIsExecuted() { + // Given + dbCircuitBreaker.transitionToOpenState(); + + // When & Then + assertThatThrownBy(() -> notificationService.createNewNotification(createNotificationCreateEvent())) + .isInstanceOf(NotificationException.class) + .hasCauseInstanceOf(CallNotPermittedException.class); + + // DB Repository는 절대 호출되지 않았어야 함 + verify(mongoRepository, never()).save(any(NotificationDocument.class)); + System.out.println("✅ 성공: OPEN 상태에서 Fallback 동작 검증 완료"); + } + + + private NotificationCreateEvent createNotificationCreateEvent() { + return NotificationCreateEvent.of( + "test@test.com", + NotificationType.COMMUNITY_DISCUSSION_VOTED_UP, + null + ); + } +} diff --git a/src/test/java/org/ezcode/codetest/infrastructure/notification/NotificationIntegrationTest.java b/src/test/java/org/ezcode/codetest/infrastructure/notification/NotificationIntegrationTest.java new file mode 100644 index 00000000..fb005598 --- /dev/null +++ b/src/test/java/org/ezcode/codetest/infrastructure/notification/NotificationIntegrationTest.java @@ -0,0 +1,174 @@ +package org.ezcode.codetest.infrastructure.notification; + +import static org.assertj.core.api.Assertions.*; +import static org.ezcode.codetest.infrastructure.notification.model.NotificationQueueConstants.*; +import static org.mockito.Mockito.*; + +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; + +import org.awaitility.Awaitility; +import org.ezcode.codetest.application.notification.enums.NotificationType; +import org.ezcode.codetest.application.notification.event.NotificationCreateEvent; +import org.ezcode.codetest.infrastructure.notification.model.NotificationProcessLog; +import org.ezcode.codetest.infrastructure.notification.model.NotificationProcessLog.ProcessStatus; +import org.ezcode.codetest.infrastructure.notification.repository.NotificationMongoRepository; +import org.ezcode.codetest.infrastructure.notification.repository.NotificationProcessLogRepository; +import org.ezcode.codetest.infrastructure.notification.service.NotificationRetryScheduler; +import org.ezcode.codetest.infrastructure.notification.service.NotificationService; +import org.ezcode.codetest.infrastructure.notification.service.ProcessLogService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.bean.override.mockito.MockitoSpyBean; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +@Disabled +@SpringBootTest +@ActiveProfiles("test") +public class NotificationIntegrationTest { + + @Autowired + private JmsTemplate jmsTemplate; + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private NotificationMongoRepository notificationRepository; + + @Autowired + private NotificationProcessLogRepository processLogRepository; + + @Autowired + private NotificationRetryScheduler retryScheduler; + + // 실제 객체를 사용하되, 특정 메서드의 동작을 조작하기 위해 @SpyBean 사용 + @MockitoSpyBean + private NotificationService notificationService; + + @MockitoSpyBean + private ProcessLogService processLogService; + + @AfterEach + void tearDown() { + notificationRepository.deleteAll(); + processLogRepository.deleteAll(); + } + + @Test + @DisplayName("1. 정상 시나리오: 메시지가 성공적으로 처리되고 상태가 SUCCESS로 기록된다") + void happyPath_shouldProcessMessageSuccessfully() { + // Given: 정상 메시지 준비 + String messageId = "ID-" + UUID.randomUUID(); + String payload = createDummyPayload(); + + // When: 메시지를 JMS 큐로 전송 + sendMessageToQueue(messageId, payload); + + // Then: 비동기 처리가 완료될 때까지 대기 후 검증 + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + // 1. 처리 상태 로그가 SUCCESS로 저장되었는지 확인 + Optional logOpt = processLogRepository.findById(messageId); + assertThat(logOpt).isPresent(); + assertThat(logOpt.get().getStatus()).isEqualTo(ProcessStatus.SUCCESS); + + // 2. 실제 알림 데이터가 DB에 저장되었는지 확인 + assertThat(notificationRepository.count()).isEqualTo(1); + }); + } + + @Test + @DisplayName("2. 자동 복구 시나리오: 첫 시도 실패 후, 스케줄러에 의해 재처리되어 성공한다") + void whenTransientErrorOccurs_shouldBeRecoveredByScheduler() { + // Given: 메시지 준비 및 첫 시도에만 DB 장애가 발생하도록 설정 + String messageId = "ID-" + UUID.randomUUID(); + String payload = createDummyPayload(); + + // 첫 번째 processCreationEvent 호출 시에만 예외를 던지고, 그 이후에는 실제 메서드를 호출하도록 설정 + doThrow(new DataAccessResourceFailureException("DB is temporarily down")) + .doCallRealMethod() + .when(notificationService).createNewNotification(any(NotificationCreateEvent.class)); + + // When: 1. 첫 번째 메시지 전송 (실패 유도) + sendMessageToQueue(messageId, payload); + + // Then: 1. 상태가 FAILED로 기록될 때까지 대기 + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + Optional logOpt = processLogRepository.findById(messageId); + assertThat(logOpt).isPresent(); + assertThat(logOpt.get().getStatus()).isEqualTo(ProcessStatus.FAILED); + // 아직 성공 전이므로 알림 데이터는 없어야 함 + assertThat(notificationRepository.count()).isZero(); + }); + + // When: 2. 재처리 스케줄러 수동 실행 + System.out.println("--- 재처리 스케줄러 실행 ---"); + retryScheduler.retryFailedNotifications(); + + // Then: 2. 재처리가 성공하여 상태가 SUCCESS로 변경될 때까지 대기 + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + Optional logOpt = processLogRepository.findById(messageId); + assertThat(logOpt).isPresent(); + assertThat(logOpt.get().getStatus()).isEqualTo(ProcessStatus.SUCCESS); + // 최종적으로 알림 데이터가 저장되었는지 확인 + assertThat(notificationRepository.count()).isEqualTo(1); + }); + } + + @Test + @DisplayName("3. 멱등성 시나리오: 성공한 메시지를 다시 보내도 중복 처리되지 않는다") + void whenSameMessageIsSent_shouldNotBeProcessedAgain() { + // Given: 메시지를 한 번 성공적으로 처리 + String messageId = "ID-" + UUID.randomUUID(); + String payload = createDummyPayload(); + sendMessageToQueue(messageId, payload); + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> + assertThat(processLogRepository.findById(messageId).get().getStatus()).isEqualTo(ProcessStatus.SUCCESS) + ); + // UseCase가 1번 호출되었는지 확인 + verify(notificationService, times(1)).createNewNotification(any(NotificationCreateEvent.class)); + + // When: 동일한 메시지를 다시 전송 + System.out.println("--- 동일 메시지 재전송 ---"); + sendMessageToQueue(messageId, payload); + + // 잠시 대기 (소비자가 메시지를 처리할 시간) + try { Thread.sleep(2000); } catch (InterruptedException e) {} + + // Then: UseCase가 추가로 호출되지 않았어야 함 (총 호출 횟수가 여전히 1이어야 함) + verify(notificationService, times(1)).createNewNotification(any(NotificationCreateEvent.class)); + // 최종 데이터도 1개여야 함 + assertThat(notificationRepository.count()).isEqualTo(1); + } + + private void sendMessageToQueue(String messageId, String payload) { + jmsTemplate.convertAndSend(NOTIFICATION_QUEUE_CREATE, payload, message -> { + message.setStringProperty(CUSTOM_HEADER_MESSAGE_ID, messageId); + return message; + }); + } + + private String createDummyPayload() { + try { + return objectMapper.writeValueAsString( + NotificationCreateEvent.of( + "test@test.com", + NotificationType.COMMUNITY_DISCUSSION_VOTED_UP, + null + ) + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index ceb203f0..8c2cb1ba 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -85,8 +85,6 @@ spring.security.oauth2.client.registration.google.scope=profile,email # ======================== spring.data.mongodb.uri=${SPRING_MONGODB_URI} spring.data.mongodb.auto-index-creation=true -logging.level.org.mongodb.driver.protocol.command=DEBUG -logging.level.org.springframework.data.mongodb.core.MongoTemplate=DEBUG # ======================== # Spring Cache Caffeine @@ -105,3 +103,15 @@ discord.webhook.url=${DISCORD_WEBHOOK_URL} #logging.level.org.springframework.security.oauth2=DEBUG #logging.level.org.springframework.security.oauth2.client=TRACE #logging.level.org.springframework.web.client.RestTemplate=TRACE + +# ======================== +# Circuit Breaker +# ======================== +resilience4j.circuitbreaker.instances.db-circuit.sliding-window-size=10 +resilience4j.circuitbreaker.instances.db-circuit.minimum-number-of-calls=5 +resilience4j.circuitbreaker.instances.db-circuit.failure-rate-threshold=50 +resilience4j.circuitbreaker.instances.db-circuit.wait-duration-in-open-state=10s +resilience4j.circuitbreaker.instances.db-circuit.slow-call-duration-threshold=2s +resilience4j.circuitbreaker.instances.db-circuit.slow-call-rate-threshold=60 +resilience4j.circuitbreaker.instances.db-circuit.register-health-indicator=true +resilience4j.circuitbreaker.instances.db-circuit.record-exceptions=org.springframework.dao.DataAccessResourceFailureException,org.springframework.dao.DataAccessException,com.mongodb.MongoTimeoutException,java.net.ConnectException,java.util.concurrent.TimeoutException \ No newline at end of file