diff --git a/product/build.gradle b/product/build.gradle index e433bc1..6d542df 100644 --- a/product/build.gradle +++ b/product/build.gradle @@ -1,4 +1,4 @@ -plugins { + plugins { id 'java' id 'org.springframework.boot' version '3.4.4' id 'io.spring.dependency-management' version '1.1.7' @@ -45,7 +45,6 @@ dependencies { testAnnotationProcessor 'org.mapstruct:mapstruct-processor:1.5.3.Final' testImplementation 'org.springframework.kafka:spring-kafka-test' - testImplementation 'org.springframework.kafka:spring-kafka-test' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'com.h2database:h2' testImplementation 'org.testcontainers:testcontainers:1.19.3' diff --git a/product/src/main/java/com/sangyunpark/product/application/StockService.java b/product/src/main/java/com/sangyunpark/product/application/StockService.java index 4230e5b..f0d7763 100644 --- a/product/src/main/java/com/sangyunpark/product/application/StockService.java +++ b/product/src/main/java/com/sangyunpark/product/application/StockService.java @@ -4,6 +4,7 @@ import com.sangyunpark.product.constant.ErrorCode; import com.sangyunpark.product.exception.BusinessException; import com.sangyunpark.product.infrastructure.kafka.StockEventProducer; +import com.sangyunpark.product.infrastructure.redis.OrderDuplicationRepository; import com.sangyunpark.product.infrastructure.redis.StockRedisRepository; import com.sangyunpark.product.infrastructure.repository.StockJpaRepository; import lombok.RequiredArgsConstructor; @@ -20,13 +21,19 @@ public class StockService { private final StockJpaRepository stockJpaRepository; private final StockRedisRepository stockRedisRepository; private final StockEventProducer stockEventProducer; + private final OrderDuplicationRepository orderDuplicationRepository; public void decreaseStockAndPublish(final Long productId, final Long quantity, final Long orderId) { - if(!stockJpaRepository.existsById(productId)) { + if(!orderDuplicationRepository.saveIfAbsent(orderId, Duration.ofSeconds(30L))) { + log.info("이미 처리된 주문 orderId: {}", orderId); + return; + } + + if(!stockRedisRepository.isExisted(productId)) { Long dbQuantity = stockJpaRepository.findQuantityByProductId(productId) .orElseThrow(() -> new BusinessException(ErrorCode.PRODUCT_NOT_FOUND)); - stockRedisRepository.setIfAbsentWithTTL(productId, dbQuantity, Duration.ofSeconds(30)); + stockRedisRepository.setIfAbsentWithTTL(productId, dbQuantity, Duration.ofMinutes(3)); } final Long remainStock = stockRedisRepository.decrease(productId, quantity); diff --git a/product/src/main/java/com/sangyunpark/product/config/KafkaConsumerConfig.java b/product/src/main/java/com/sangyunpark/product/config/KafkaConsumerConfig.java index f919ccb..40a0771 100644 --- a/product/src/main/java/com/sangyunpark/product/config/KafkaConsumerConfig.java +++ b/product/src/main/java/com/sangyunpark/product/config/KafkaConsumerConfig.java @@ -1,6 +1,8 @@ package com.sangyunpark.product.config; import com.sangyunpark.product.application.event.StockDeductedEvent; +import com.sangyunpark.product.global.ExponentialBackOffWithJitter; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -8,12 +10,13 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.util.backoff.FixedBackOff; +@Slf4j @Configuration -public class KafkaConsumerConfig { +public class KafkaConsumerConfig { - final String DLT = ".DLT"; + private final String DLT = ".DLT"; + private final int MAX_RETRY_COUNT = 5; @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( @@ -33,6 +36,14 @@ public DefaultErrorHandler errorHandler(KafkaTemplate new org.apache.kafka.common.TopicPartition(record.topic() + DLT, record.partition())); - return new DefaultErrorHandler(recover, new FixedBackOff(1000L, 3L)); + ExponentialBackOffWithJitter backOff = new ExponentialBackOffWithJitter(1000L, 2.0, 16000L, MAX_RETRY_COUNT); + + DefaultErrorHandler errorHandler = new DefaultErrorHandler(recover, backOff); + + errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> { + log.warn("카프카 컨슈머 재시도 횟수 {} 기록 내용: {}", deliveryAttempt, record); + }); + + return errorHandler; } } diff --git a/product/src/main/java/com/sangyunpark/product/global/ExponentialBackOffWithJitter.java b/product/src/main/java/com/sangyunpark/product/global/ExponentialBackOffWithJitter.java new file mode 100644 index 0000000..702b511 --- /dev/null +++ b/product/src/main/java/com/sangyunpark/product/global/ExponentialBackOffWithJitter.java @@ -0,0 +1,44 @@ +package com.sangyunpark.product.global; + +import java.util.Random; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +public class ExponentialBackOffWithJitter implements BackOff { + + private final double multiplier; + private final long maxElapsedTime; + private final int maxAttempts; + + private long currentInterval; + private int attemptCount; + private long elapsedTime; + private final Random random = new Random(); + + public ExponentialBackOffWithJitter(long initialInterval, double multiplier, long maxElapsedTime, int maxAttempts) { + this.maxAttempts= maxAttempts; + this.multiplier = multiplier; + this.maxElapsedTime = maxElapsedTime; + this.currentInterval = initialInterval; + this.elapsedTime = 0; + } + + @Override + public BackOffExecution start() { + return () -> { + if (elapsedTime >= maxElapsedTime || attemptCount >= maxAttempts) { + return BackOffExecution.STOP; + } + + long next = currentInterval; + long jittered = (next > 0) ? random.nextLong(next + 1) : 0; + + currentInterval = Math.min((long) (currentInterval * multiplier), Long.MAX_VALUE / 2); + + elapsedTime += jittered; + attemptCount++; + + return jittered; + }; + } +} \ No newline at end of file diff --git a/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventConsumer.java b/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventConsumer.java index b242c2a..79826b9 100644 --- a/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventConsumer.java +++ b/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventConsumer.java @@ -3,8 +3,6 @@ import com.sangyunpark.product.application.event.StockDeductedEvent; import com.sangyunpark.product.constant.ErrorCode; import com.sangyunpark.product.exception.BusinessException; -import com.sangyunpark.product.infrastructure.redis.OrderDuplicationRepository; -import com.sangyunpark.product.infrastructure.redis.StockRedisRepository; import com.sangyunpark.product.infrastructure.repository.StockJpaRepository; import jakarta.transaction.Transactional; import lombok.RequiredArgsConstructor; @@ -21,24 +19,14 @@ public class StockEventConsumer { private final String GROUP_ID = "product-service"; private final StockJpaRepository stockJpaRepository; - private final OrderDuplicationRepository orderDuplicationRepository; - private final StockRedisRepository stockRedisRepository; @Transactional @KafkaListener(topics = TOPIC, groupId = GROUP_ID) public void consumeStockDeductedEvent(final StockDeductedEvent event) { - if(orderDuplicationRepository.isAlreadyProcessed(event.orderId())) { - log.info("이미 처리된 주문 orderId: {}", event.orderId()); - stockRedisRepository.increase(event.productId(), event.quantity()); - return; - } - int updatedRows = stockJpaRepository.decreaseStock(event.productId(), event.quantity()); if(updatedRows == 0) { throw new BusinessException(ErrorCode.STOCK_NOT_ENOUGH); } - - orderDuplicationRepository.saveProcessed(event.orderId()); } } \ No newline at end of file diff --git a/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducer.java b/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducer.java index 73a924b..7b54780 100644 --- a/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducer.java +++ b/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducer.java @@ -1,7 +1,6 @@ package com.sangyunpark.product.infrastructure.kafka; import com.sangyunpark.product.application.event.StockDeductedEvent; -import com.sangyunpark.product.infrastructure.redis.StockRedisRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; @@ -13,15 +12,11 @@ public class StockEventProducer { private final String TOPIC = "stock.deducted"; - - private final StockRedisRepository stockRedisRepository; private final KafkaTemplate kafkaTemplate; public void sendStockDeductedEvent(final StockDeductedEvent stockDeductedEvent) { - - kafkaTemplate.send(TOPIC, stockDeductedEvent) + kafkaTemplate.send(TOPIC, String.valueOf(stockDeductedEvent.productId()), stockDeductedEvent) .exceptionally(ex -> { - stockRedisRepository.increase(stockDeductedEvent.productId(), stockDeductedEvent.quantity()); log.error("Kafka 메시지 전송 실패로 재고 복구 수행"); return null; }); diff --git a/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/dlt/StockDLTListener.java b/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/dlt/StockDLTListener.java index ce19254..b00fff3 100644 --- a/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/dlt/StockDLTListener.java +++ b/product/src/main/java/com/sangyunpark/product/infrastructure/kafka/dlt/StockDLTListener.java @@ -1,16 +1,22 @@ package com.sangyunpark.product.infrastructure.kafka.dlt; import com.sangyunpark.product.application.event.StockDeductedEvent; +import com.sangyunpark.product.infrastructure.redis.StockRedisRepository; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component +@RequiredArgsConstructor public class StockDLTListener { + private final StockRedisRepository stockRedisRepository; + @KafkaListener(topics = "stock.deducted.DLT", groupId = "product-service-dlt") public void listenStockDLT(final StockDeductedEvent event) { - log.error("DLT 수신 - event: {}", event); + log.error("Dead Letter 수신 - 재고 복구 진행. event: {}", event); + stockRedisRepository.increase(event.productId(), event.quantity()); } } diff --git a/product/src/main/java/com/sangyunpark/product/infrastructure/redis/OrderDuplicationRepository.java b/product/src/main/java/com/sangyunpark/product/infrastructure/redis/OrderDuplicationRepository.java index e5ddb50..28344a3 100644 --- a/product/src/main/java/com/sangyunpark/product/infrastructure/redis/OrderDuplicationRepository.java +++ b/product/src/main/java/com/sangyunpark/product/infrastructure/redis/OrderDuplicationRepository.java @@ -13,11 +13,11 @@ public class OrderDuplicationRepository { private final StringRedisTemplate stringRedisTemplate; private final String PREFIX = "stock:deducted:"; - public boolean isAlreadyProcessed(final Long orderId) { - return Boolean.TRUE.equals(stringRedisTemplate.hasKey(PREFIX + orderId)); + private String getKey(final Long orderId) { + return PREFIX + orderId; } - public void saveProcessed(final Long orderId) { - stringRedisTemplate.opsForValue().set(PREFIX + orderId, "done", Duration.ofMinutes(10)); + public boolean saveIfAbsent(final Long orderId, final Duration ttl) { + return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(getKey(orderId), "done", ttl)); } } diff --git a/product/src/main/java/com/sangyunpark/product/infrastructure/redis/StockRedisRepository.java b/product/src/main/java/com/sangyunpark/product/infrastructure/redis/StockRedisRepository.java index 3af430e..26f1847 100644 --- a/product/src/main/java/com/sangyunpark/product/infrastructure/redis/StockRedisRepository.java +++ b/product/src/main/java/com/sangyunpark/product/infrastructure/redis/StockRedisRepository.java @@ -49,6 +49,11 @@ public Long decrease(final Long productId, final Long amount) { ); } + public boolean isExisted(final Long productId) { + String key = getKey(productId); + return Boolean.TRUE.equals(redisTemplate.hasKey(key)); + } + public void increase(final Long productId, final Long amount) { redisTemplate.opsForValue().increment(getKey(productId), amount); } diff --git a/product/src/test/java/com/sangyunpark/product/application/StockServiceTest.java b/product/src/test/java/com/sangyunpark/product/application/StockServiceTest.java index bdb03eb..5d34e14 100644 --- a/product/src/test/java/com/sangyunpark/product/application/StockServiceTest.java +++ b/product/src/test/java/com/sangyunpark/product/application/StockServiceTest.java @@ -4,6 +4,7 @@ import com.sangyunpark.product.constant.ErrorCode; import com.sangyunpark.product.exception.BusinessException; import com.sangyunpark.product.infrastructure.kafka.StockEventProducer; +import com.sangyunpark.product.infrastructure.redis.OrderDuplicationRepository; import com.sangyunpark.product.infrastructure.redis.StockRedisRepository; import com.sangyunpark.product.infrastructure.repository.StockJpaRepository; import org.junit.jupiter.api.DisplayName; @@ -13,11 +14,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.Duration; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -37,6 +38,9 @@ class StockServiceTest { @Mock private StockEventProducer stockEventProducer; + @Mock + private OrderDuplicationRepository orderDuplicationRepository; + private final String ERROR_CODE = "errorCode"; @Test @@ -44,7 +48,11 @@ class StockServiceTest { void 캐시없음_정상차감() { // given Long productId = 1L, orderId = 100L, quantity = 2L; - given(stockJpaRepository.existsById(productId)).willReturn(false); + Duration duplicationTtl = Duration.ofSeconds(30L); + Duration cacheTtl = Duration.ofMinutes(3L); + + given(orderDuplicationRepository.saveIfAbsent(orderId, duplicationTtl)).willReturn(true); + given(stockRedisRepository.isExisted(productId)).willReturn(false); // given(stockJpaRepository.findQuantityByProductId(productId)).willReturn(Optional.of(10L)); given(stockRedisRepository.decrease(productId, quantity)).willReturn(8L); @@ -52,7 +60,7 @@ class StockServiceTest { stockService.decreaseStockAndPublish(productId, quantity, orderId); // then - verify(stockRedisRepository).setIfAbsentWithTTL(eq(productId), eq(10L), any()); + verify(stockRedisRepository).setIfAbsentWithTTL(productId, 10L, cacheTtl); // verify(stockEventProducer).sendStockDeductedEvent( new StockDeductedEvent(orderId, productId, quantity) ); @@ -63,14 +71,17 @@ class StockServiceTest { void 캐시있음_정상차감() { // given Long productId = 2L, orderId = 200L, quantity = 3L; - given(stockJpaRepository.existsById(productId)).willReturn(true); + Duration duration = Duration.ofSeconds(30L); + + given(orderDuplicationRepository.saveIfAbsent(orderId, duration)).willReturn(true); // ✅ 고쳐야 할 부분 + given(stockRedisRepository.isExisted(productId)).willReturn(true); given(stockRedisRepository.decrease(productId, quantity)).willReturn(7L); // when stockService.decreaseStockAndPublish(productId, quantity, orderId); // then - verify(stockRedisRepository, never()).setIfAbsentWithTTL(any(), any(), any()); + verify(stockRedisRepository, never()).setIfAbsentWithTTL(any(), any(), any()); // 캐시가 이미 존재하므로 호출 X verify(stockEventProducer).sendStockDeductedEvent( new StockDeductedEvent(orderId, productId, quantity) ); @@ -81,7 +92,9 @@ class StockServiceTest { void 재고부족_예외() { // given Long productId = 3L, orderId = 300L, quantity = 20L; - given(stockJpaRepository.existsById(productId)).willReturn(true); + + given(orderDuplicationRepository.saveIfAbsent(orderId, Duration.ofSeconds(30L))).willReturn(true); + given(stockRedisRepository.isExisted(productId)).willReturn(true); given(stockRedisRepository.decrease(productId, quantity)).willReturn(-1L); // expect @@ -95,7 +108,9 @@ class StockServiceTest { void 상품없음_예외() { // given Long productId = 99L, orderId = 400L, quantity = 1L; - given(stockJpaRepository.existsById(productId)).willReturn(false); + + given(orderDuplicationRepository.saveIfAbsent(orderId, Duration.ofSeconds(30L))).willReturn(true); + given(stockRedisRepository.isExisted(productId)).willReturn(false); given(stockJpaRepository.findQuantityByProductId(productId)).willReturn(Optional.empty()); // expect diff --git a/product/src/test/java/com/sangyunpark/product/config/ExponentialBackOffWithJitterTest.java b/product/src/test/java/com/sangyunpark/product/config/ExponentialBackOffWithJitterTest.java new file mode 100644 index 0000000..6865d5a --- /dev/null +++ b/product/src/test/java/com/sangyunpark/product/config/ExponentialBackOffWithJitterTest.java @@ -0,0 +1,102 @@ +package com.sangyunpark.product.config; + +import com.sangyunpark.product.global.ExponentialBackOffWithJitter; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.util.backoff.BackOffExecution; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExponentialBackOffWithJitterTest { + + @Test + @DisplayName("ExponentialBackOffWithJitter - 재시도 간격이 증가하면서 랜덤 대기 후 종료되는지 테스트") + void 재시도_간격이_증가하며_랜덤_대기하고_종료되는지_테스트() { + // given + final long initialInterval = 1000L; + final double multiplier = 2.0; + final long maxElapsedTime = 10000L; + + ExponentialBackOffWithJitter backOff = new ExponentialBackOffWithJitter(initialInterval, multiplier, maxElapsedTime,Integer.MAX_VALUE); + BackOffExecution execution = backOff.start(); + + long elapsed = 0L; + int attempt = 0; + + // when + while (true) { + long nextBackOff = execution.nextBackOff(); + if (nextBackOff == BackOffExecution.STOP) { + System.out.println("재시도 중단 (STOP)"); + break; + } + + System.out.printf("[%d번째 재시도] 대기 시간: %dms%n", ++attempt, nextBackOff); + elapsed += nextBackOff; + } + + // then + System.out.println("총 대기 시간: " + elapsed + "ms"); + assertThat(elapsed).isGreaterThanOrEqualTo(0); + assertThat(elapsed).isGreaterThanOrEqualTo(maxElapsedTime - 1); + } + + @Test + @DisplayName("ExponentialBackOffWithJitter - 생성 시 초기값이 올바른지 테스트") + void 생성자_초기값_검증() { + // given + final long initialInterval = 500L; + final double multiplier = 2.0; + final long maxElapsedTime = 5000L; + + // when + ExponentialBackOffWithJitter backOff = new ExponentialBackOffWithJitter(initialInterval, multiplier, maxElapsedTime,5); + + // then + assertThat(backOff).isNotNull(); // 객체가 잘 만들어졌는지 + } + + @Test + @DisplayName("ExponentialBackOffWithJitter - 최대 대기시간이 0이면 즉시 중단되는지 테스트") + void 최대_대기시간_0이면_바로_중단() { + // given + final long initialInterval = 1000L; + final double multiplier = 2.0; + final long maxElapsedTime = 0L; // 최대 대기시간 0 + + ExponentialBackOffWithJitter backOff = new ExponentialBackOffWithJitter(initialInterval, multiplier, maxElapsedTime,5); + BackOffExecution execution = backOff.start(); + + // when + long waitTime = execution.nextBackOff(); + + // then + assertThat(waitTime).isEqualTo(BackOffExecution.STOP); + } + + @Test + @DisplayName("ExponentialBackOffWithJitter - 최대 재시도 횟수 초과 시 중단 테스트") + void 최대_재시도_횟수_초과시_STOP() { + // given + final long initialInterval = 1000L; // 1초 + final double multiplier = 2.0; + final long maxElapsedTime = 10000L; // 10초 + final int maxAttempts = 5; // 최대 5번만 재시도 + + ExponentialBackOffWithJitter backOff = new ExponentialBackOffWithJitter(initialInterval, multiplier, maxElapsedTime, maxAttempts); + BackOffExecution execution = backOff.start(); + + // when + int attempts = 0; + while (true) { + long nextBackOff = execution.nextBackOff(); + if (nextBackOff == BackOffExecution.STOP) { + break; + } + attempts++; + } + + // then + assertThat(attempts).isEqualTo(maxAttempts); + } +} \ No newline at end of file diff --git a/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventOrderingTest.java b/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventOrderingTest.java new file mode 100644 index 0000000..514ea71 --- /dev/null +++ b/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventOrderingTest.java @@ -0,0 +1,87 @@ +package com.sangyunpark.product.infrastructure.kafka; + +import com.sangyunpark.product.application.event.StockDeductedEvent; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest +@EmbeddedKafka(partitions = 3, topics = "stock.deducted") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class StockEventOrderingTest { + private static final String TOPIC = "stock.deducted"; + private static final int MESSAGE_COUNT = 100; + + @Autowired + private StockEventProducer stockEventProducer; + + @Autowired + private EmbeddedKafkaBroker embeddedKafkaBroker; + + private Consumer consumer; + + @BeforeEach + void setUp() { + Map consumerProps = KafkaTestUtils.consumerProps( + "ordering-test-group", "true", embeddedKafkaBroker + ); + consumer = new DefaultKafkaConsumerFactory<>( + consumerProps, + new StringDeserializer(), + new JsonDeserializer<>(StockDeductedEvent.class, false) + ).createConsumer(); + + embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, TOPIC); + } + + @AfterEach + void tearDown() { + consumer.close(); + } + + @Test + void 재고차감_이벤트_순서보장_검증() throws Exception { + + for (int i = 0; i < MESSAGE_COUNT; i++) { + stockEventProducer.sendStockDeductedEvent(new StockDeductedEvent((long) i, 1L, 1L)); + } + + List> consumedRecords = new ArrayList<>(); + long endTime = System.currentTimeMillis() + 5000; + + while (System.currentTimeMillis() < endTime && consumedRecords.size() < MESSAGE_COUNT) { + consumer.poll(java.time.Duration.ofMillis(500)).forEach(consumedRecords::add); + } + + assertThat(consumedRecords).hasSize(MESSAGE_COUNT); + + List orderIds = consumedRecords.stream() + .map(record -> record.value().orderId()) + .toList(); + + List expectedOrderIds = IntStream.range(0, MESSAGE_COUNT) + .mapToObj(i -> (long) i) + .toList(); + + assertThat(orderIds) + .isEqualTo(expectedOrderIds); + } +} diff --git a/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducerTest.java b/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducerTest.java index 887a4b0..8b43897 100644 --- a/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducerTest.java +++ b/product/src/test/java/com/sangyunpark/product/infrastructure/kafka/StockEventProducerTest.java @@ -1,7 +1,6 @@ package com.sangyunpark.product.infrastructure.kafka; import com.sangyunpark.product.application.event.StockDeductedEvent; -import com.sangyunpark.product.infrastructure.redis.StockRedisRepository; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -22,14 +21,11 @@ class StockEventProducerTest { @Mock private KafkaTemplate kafkaTemplate; - @Mock - private StockRedisRepository stockRedisRepository; - private StockEventProducer stockEventProducer; @BeforeEach void setUp() { - stockEventProducer = new StockEventProducer(stockRedisRepository,kafkaTemplate); + stockEventProducer = new StockEventProducer(kafkaTemplate); } @Test @@ -39,26 +35,11 @@ void setUp() { StockDeductedEvent event = new StockDeductedEvent(1L, 5L, 100L); // when - when(kafkaTemplate.send("stock.deducted", event)) + when(kafkaTemplate.send("stock.deducted", String.valueOf(event.productId()), event)) .thenReturn(CompletableFuture.completedFuture(null)); stockEventProducer.sendStockDeductedEvent(event); // then - verify(kafkaTemplate).send("stock.deducted", event); - } - - @Test - @DisplayName("Kafka 전송 실패 시 재고가 복구되는지 확인한다.") - void kafka_전송_실패시_재고_복구_확인() { - // given - StockDeductedEvent event = new StockDeductedEvent(1L, 5L, 100L); - when(kafkaTemplate.send("stock.deducted", event)) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Kafka send fail"))); - - // when - stockEventProducer.sendStockDeductedEvent(event); - - // then - verify(stockRedisRepository).increase(5L, 100L); + verify(kafkaTemplate).send("stock.deducted", String.valueOf(event.productId()), event); } } \ No newline at end of file diff --git a/product/src/test/java/com/sangyunpark/product/integration/StockEventProducerIntegrationTest.java b/product/src/test/java/com/sangyunpark/product/integration/StockEventProducerIntegrationTest.java index cd5d8a7..a55083b 100644 --- a/product/src/test/java/com/sangyunpark/product/integration/StockEventProducerIntegrationTest.java +++ b/product/src/test/java/com/sangyunpark/product/integration/StockEventProducerIntegrationTest.java @@ -24,6 +24,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; +import java.util.Collections; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -77,7 +78,7 @@ void setUp() { new JsonDeserializer<>(StockDeductedEvent.class, false) ).createConsumer(); - consumer.subscribe(java.util.Collections.singletonList(TOPIC)); + consumer.subscribe(Collections.singletonList(TOPIC)); } @Test diff --git a/product/src/test/java/com/sangyunpark/product/integration/StockIntegrationTest.java b/product/src/test/java/com/sangyunpark/product/integration/StockIntegrationTest.java index 49639bd..3e2fa05 100644 --- a/product/src/test/java/com/sangyunpark/product/integration/StockIntegrationTest.java +++ b/product/src/test/java/com/sangyunpark/product/integration/StockIntegrationTest.java @@ -9,11 +9,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.http.MediaType; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.web.servlet.MockMvc; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -47,22 +47,24 @@ public class StockIntegrationTest { private StockRedisRepository stockRedisRepository; @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")); + static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) + .withReuse(true); - @Container - static GenericContainer redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0.5")) - .withExposedPorts(6379); + @Autowired + private RedisTemplate redisTemplate; @DynamicPropertySource static void setProperties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); - registry.add("spring.data.redis.host", redisContainer::getHost); - registry.add("spring.data.redis.port", () -> redisContainer.getMappedPort(6379)); + registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest"); + registry.add("spring.kafka.consumer.enable-auto-commit", () -> "false"); + registry.add("spring.kafka.listener.ack-mode", () -> "manual"); } @BeforeEach void setUp() { + redisTemplate.getConnectionFactory().getConnection().flushAll(); Stock stock1 = Stock.builder().productId(1L).quantity(10L).build(); Stock stock2 = Stock.builder().productId(2L).quantity(10L).build(); Stock stock3 = Stock.builder().productId(3L).quantity(101L).build(); @@ -84,7 +86,7 @@ void setUp() { .andExpect(status().isOk()); await() - .atMost(Duration.ofSeconds(20)) + .atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { Stock stock = stockJpaRepository.findById(1L).orElseThrow(); assertEquals(7, stock.getQuantity());