Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ccb0bd0
feat: Consumer 재시도 전략 ExponentialBackOffWithJitter 클래스 구현 및 적용 (#11)
sangyunpark99 Apr 28, 2025
6bf4a3a
test: Consumer 재시도 전략 ExponentialBackOffWithJitter 클래스 테스트 코드 구현 (#11)
sangyunpark99 Apr 28, 2025
87ad012
feat: Consumer 재시도 전략 ExponentialBackOffWithJitter 최대 시도 횟수 적용 (#11)
sangyunpark99 Apr 28, 2025
53174f2
test: Consumer 재시도 전략 ExponentialBackOffWithJitter 최대 시도 횟수 적용 test 구…
sangyunpark99 Apr 28, 2025
843919f
refactor: global 폴더로 ExponentialBackOffWithJitter 클래스 이동 (#11)
sangyunpark99 Apr 29, 2025
f1086ca
refactor: global 폴더로 ExponentialBackOffWithJitter 클래스 이동 (#11)
sangyunpark99 Apr 29, 2025
7172fd7
refactor: global 폴더로 ExponentialBackOffWithJitter 클래스 이동 (#11)
sangyunpark99 Apr 29, 2025
b03ddff
feat: Redis에 상품 key가 존재하는지 확인하는 메서드 구현 (#11)
sangyunpark99 Apr 29, 2025
f71e9da
feat: Redis에 상품 재고가 존재하는지 확인하는 메서드 추가 (#11)
sangyunpark99 Apr 29, 2025
d372ce5
test: Redis에 상품 재고가 존재하는지 확인하는 메서드로 인한 테스트 코드 수정 (#11)
sangyunpark99 Apr 29, 2025
60ebcd2
fix: Kafka Consumer Redis 복구방식 개선 (#11)
sangyunpark99 Apr 29, 2025
62463fd
chore: 불필요한 kafka-test impl 제거하기 (#11)
sangyunpark99 Apr 29, 2025
9714cdf
refactor: 사용하지 않는 StockRedisRepository import문 제거 (#11)
sangyunpark99 Apr 29, 2025
c2a6d22
test: kafka Producer Event 발행 순서 테스트 구현 (#11)
sangyunpark99 Apr 29, 2025
b9f4eb6
feat: producer 실패시 해주는 Redis 재고 복구 로직 제거 (#11)
sangyunpark99 Apr 29, 2025
8159c1e
test: 사용하지 않는 stockRedisRepoistory 주입 제거 (#11)
sangyunpark99 Apr 29, 2025
e86b1ca
test: 결과 비교 횟수 수정 (#11)
sangyunpark99 Apr 30, 2025
de3bac8
test: setIfAbsent 메서드 구현 (#11)
sangyunpark99 Apr 30, 2025
ba6e14f
feat: consumer에서 진행되던 재고 중복 확인 및 저장 로직 제거 (#11)
sangyunpark99 Apr 30, 2025
a8ce10f
test: java.util.Collections를 import로 변경 (#11)
sangyunpark99 Apr 30, 2025
bfea5ba
test: 사용하지 않는 stockRedisRepository 필드 변수 삭제 (#11)
sangyunpark99 Apr 30, 2025
e7f5674
test: testContainer Kafka 설정 변경 (#11)
sangyunpark99 Apr 30, 2025
fe1b5cc
feat: 이미 처리된 주문 로직 service단에서 검증 (#11)
sangyunpark99 Apr 30, 2025
1ce1566
test: 이미 처리된 주문 로직 service단에서 검증에 따른 테스트 코드 변경 (#11)
sangyunpark99 Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions product/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
plugins {
plugins {
id 'java'
id 'org.springframework.boot' version '3.4.4'
id 'io.spring.dependency-management' version '1.1.7'
Expand Down Expand Up @@ -45,7 +45,6 @@ dependencies {

testAnnotationProcessor 'org.mapstruct:mapstruct-processor:1.5.3.Final'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'com.h2database:h2'
testImplementation 'org.testcontainers:testcontainers:1.19.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.sangyunpark.product.constant.ErrorCode;
import com.sangyunpark.product.exception.BusinessException;
import com.sangyunpark.product.infrastructure.kafka.StockEventProducer;
import com.sangyunpark.product.infrastructure.redis.OrderDuplicationRepository;
import com.sangyunpark.product.infrastructure.redis.StockRedisRepository;
import com.sangyunpark.product.infrastructure.repository.StockJpaRepository;
import lombok.RequiredArgsConstructor;
Expand All @@ -20,13 +21,19 @@ public class StockService {
private final StockJpaRepository stockJpaRepository;
private final StockRedisRepository stockRedisRepository;
private final StockEventProducer stockEventProducer;
private final OrderDuplicationRepository orderDuplicationRepository;

public void decreaseStockAndPublish(final Long productId, final Long quantity, final Long orderId) {

if(!stockJpaRepository.existsById(productId)) {
if(!orderDuplicationRepository.saveIfAbsent(orderId, Duration.ofSeconds(30L))) {
log.info("이미 처리된 주문 orderId: {}", orderId);
return;
}

if(!stockRedisRepository.isExisted(productId)) {
Long dbQuantity = stockJpaRepository.findQuantityByProductId(productId)
.orElseThrow(() -> new BusinessException(ErrorCode.PRODUCT_NOT_FOUND));
stockRedisRepository.setIfAbsentWithTTL(productId, dbQuantity, Duration.ofSeconds(30));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

나중에 admin( 판매자가 사용하는 화면에서 재고 관리할때 영향을 주지 않게 설계해야함)

stockRedisRepository.setIfAbsentWithTTL(productId, dbQuantity, Duration.ofMinutes(3));
}

final Long remainStock = stockRedisRepository.decrease(productId, quantity);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package com.sangyunpark.product.config;

import com.sangyunpark.product.application.event.StockDeductedEvent;
import com.sangyunpark.product.global.ExponentialBackOffWithJitter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Slf4j
@Configuration
public class KafkaConsumerConfig {
public class KafkaConsumerConfig {

final String DLT = ".DLT";
private final String DLT = ".DLT";
private final int MAX_RETRY_COUNT = 5;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, StockDeductedEvent> kafkaListenerContainerFactory(
Expand All @@ -33,6 +36,14 @@ public DefaultErrorHandler errorHandler(KafkaTemplate<String, StockDeductedEvent
DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new org.apache.kafka.common.TopicPartition(record.topic() + DLT, record.partition()));

return new DefaultErrorHandler(recover, new FixedBackOff(1000L, 3L));
ExponentialBackOffWithJitter backOff = new ExponentialBackOffWithJitter(1000L, 2.0, 16000L, MAX_RETRY_COUNT);

DefaultErrorHandler errorHandler = new DefaultErrorHandler(recover, backOff);

errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> {
log.warn("카프카 컨슈머 재시도 횟수 {} 기록 내용: {}", deliveryAttempt, record);
});

return errorHandler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.sangyunpark.product.global;

import java.util.Random;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

public class ExponentialBackOffWithJitter implements BackOff {

private final double multiplier;
private final long maxElapsedTime;
private final int maxAttempts;

private long currentInterval;
private int attemptCount;
private long elapsedTime;
private final Random random = new Random();

public ExponentialBackOffWithJitter(long initialInterval, double multiplier, long maxElapsedTime, int maxAttempts) {
this.maxAttempts= maxAttempts;
this.multiplier = multiplier;
this.maxElapsedTime = maxElapsedTime;
this.currentInterval = initialInterval;
this.elapsedTime = 0;
}

@Override
public BackOffExecution start() {
return () -> {
if (elapsedTime >= maxElapsedTime || attemptCount >= maxAttempts) {
return BackOffExecution.STOP;
}

long next = currentInterval;
long jittered = (next > 0) ? random.nextLong(next + 1) : 0;

currentInterval = Math.min((long) (currentInterval * multiplier), Long.MAX_VALUE / 2);

elapsedTime += jittered;
attemptCount++;

return jittered;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import com.sangyunpark.product.application.event.StockDeductedEvent;
import com.sangyunpark.product.constant.ErrorCode;
import com.sangyunpark.product.exception.BusinessException;
import com.sangyunpark.product.infrastructure.redis.OrderDuplicationRepository;
import com.sangyunpark.product.infrastructure.redis.StockRedisRepository;
import com.sangyunpark.product.infrastructure.repository.StockJpaRepository;
import jakarta.transaction.Transactional;
import lombok.RequiredArgsConstructor;
Expand All @@ -21,24 +19,14 @@ public class StockEventConsumer {
private final String GROUP_ID = "product-service";

private final StockJpaRepository stockJpaRepository;
private final OrderDuplicationRepository orderDuplicationRepository;
private final StockRedisRepository stockRedisRepository;

@Transactional
@KafkaListener(topics = TOPIC, groupId = GROUP_ID)
public void consumeStockDeductedEvent(final StockDeductedEvent event) {

if(orderDuplicationRepository.isAlreadyProcessed(event.orderId())) {
log.info("이미 처리된 주문 orderId: {}", event.orderId());
stockRedisRepository.increase(event.productId(), event.quantity());
return;
}

int updatedRows = stockJpaRepository.decreaseStock(event.productId(), event.quantity());
if(updatedRows == 0) {
throw new BusinessException(ErrorCode.STOCK_NOT_ENOUGH);
}

orderDuplicationRepository.saveProcessed(event.orderId());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.sangyunpark.product.infrastructure.kafka;

import com.sangyunpark.product.application.event.StockDeductedEvent;
import com.sangyunpark.product.infrastructure.redis.StockRedisRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
Expand All @@ -13,15 +12,11 @@
public class StockEventProducer {

private final String TOPIC = "stock.deducted";

private final StockRedisRepository stockRedisRepository;
private final KafkaTemplate<String, StockDeductedEvent> kafkaTemplate;

public void sendStockDeductedEvent(final StockDeductedEvent stockDeductedEvent) {

kafkaTemplate.send(TOPIC, stockDeductedEvent)
kafkaTemplate.send(TOPIC, String.valueOf(stockDeductedEvent.productId()), stockDeductedEvent)
.exceptionally(ex -> {
stockRedisRepository.increase(stockDeductedEvent.productId(), stockDeductedEvent.quantity());
log.error("Kafka 메시지 전송 실패로 재고 복구 수행");
return null;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package com.sangyunpark.product.infrastructure.kafka.dlt;

import com.sangyunpark.product.application.event.StockDeductedEvent;
import com.sangyunpark.product.infrastructure.redis.StockRedisRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class StockDLTListener {

private final StockRedisRepository stockRedisRepository;

@KafkaListener(topics = "stock.deducted.DLT", groupId = "product-service-dlt")
public void listenStockDLT(final StockDeductedEvent event) {
log.error("DLT 수신 - event: {}", event);
log.error("Dead Letter 수신 - 재고 복구 진행. event: {}", event);
stockRedisRepository.increase(event.productId(), event.quantity());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ public class OrderDuplicationRepository {
private final StringRedisTemplate stringRedisTemplate;
private final String PREFIX = "stock:deducted:";

public boolean isAlreadyProcessed(final Long orderId) {
return Boolean.TRUE.equals(stringRedisTemplate.hasKey(PREFIX + orderId));
private String getKey(final Long orderId) {
return PREFIX + orderId;
}

public void saveProcessed(final Long orderId) {
stringRedisTemplate.opsForValue().set(PREFIX + orderId, "done", Duration.ofMinutes(10));
public boolean saveIfAbsent(final Long orderId, final Duration ttl) {
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(getKey(orderId), "done", ttl));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public Long decrease(final Long productId, final Long amount) {
);
}

public boolean isExisted(final Long productId) {
String key = getKey(productId);
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}

public void increase(final Long productId, final Long amount) {
redisTemplate.opsForValue().increment(getKey(productId), amount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.sangyunpark.product.constant.ErrorCode;
import com.sangyunpark.product.exception.BusinessException;
import com.sangyunpark.product.infrastructure.kafka.StockEventProducer;
import com.sangyunpark.product.infrastructure.redis.OrderDuplicationRepository;
import com.sangyunpark.product.infrastructure.redis.StockRedisRepository;
import com.sangyunpark.product.infrastructure.repository.StockJpaRepository;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -13,11 +14,11 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Duration;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -37,22 +38,29 @@ class StockServiceTest {
@Mock
private StockEventProducer stockEventProducer;

@Mock
private OrderDuplicationRepository orderDuplicationRepository;

private final String ERROR_CODE = "errorCode";

@Test
@DisplayName("Redis 캐시 없음 → DB에서 조회 후 캐시에 저장 + 재고 차감 성공")
void 캐시없음_정상차감() {
// given
Long productId = 1L, orderId = 100L, quantity = 2L;
given(stockJpaRepository.existsById(productId)).willReturn(false);
Duration duplicationTtl = Duration.ofSeconds(30L);
Duration cacheTtl = Duration.ofMinutes(3L);

given(orderDuplicationRepository.saveIfAbsent(orderId, duplicationTtl)).willReturn(true);
given(stockRedisRepository.isExisted(productId)).willReturn(false); //
given(stockJpaRepository.findQuantityByProductId(productId)).willReturn(Optional.of(10L));
given(stockRedisRepository.decrease(productId, quantity)).willReturn(8L);

// when
stockService.decreaseStockAndPublish(productId, quantity, orderId);

// then
verify(stockRedisRepository).setIfAbsentWithTTL(eq(productId), eq(10L), any());
verify(stockRedisRepository).setIfAbsentWithTTL(productId, 10L, cacheTtl); //
verify(stockEventProducer).sendStockDeductedEvent(
new StockDeductedEvent(orderId, productId, quantity)
);
Expand All @@ -63,14 +71,17 @@ class StockServiceTest {
void 캐시있음_정상차감() {
// given
Long productId = 2L, orderId = 200L, quantity = 3L;
given(stockJpaRepository.existsById(productId)).willReturn(true);
Duration duration = Duration.ofSeconds(30L);

given(orderDuplicationRepository.saveIfAbsent(orderId, duration)).willReturn(true); // ✅ 고쳐야 할 부분
given(stockRedisRepository.isExisted(productId)).willReturn(true);
given(stockRedisRepository.decrease(productId, quantity)).willReturn(7L);

// when
stockService.decreaseStockAndPublish(productId, quantity, orderId);

// then
verify(stockRedisRepository, never()).setIfAbsentWithTTL(any(), any(), any());
verify(stockRedisRepository, never()).setIfAbsentWithTTL(any(), any(), any()); // 캐시가 이미 존재하므로 호출 X
verify(stockEventProducer).sendStockDeductedEvent(
new StockDeductedEvent(orderId, productId, quantity)
);
Expand All @@ -81,7 +92,9 @@ class StockServiceTest {
void 재고부족_예외() {
// given
Long productId = 3L, orderId = 300L, quantity = 20L;
given(stockJpaRepository.existsById(productId)).willReturn(true);

given(orderDuplicationRepository.saveIfAbsent(orderId, Duration.ofSeconds(30L))).willReturn(true);
given(stockRedisRepository.isExisted(productId)).willReturn(true);
given(stockRedisRepository.decrease(productId, quantity)).willReturn(-1L);

// expect
Expand All @@ -95,7 +108,9 @@ class StockServiceTest {
void 상품없음_예외() {
// given
Long productId = 99L, orderId = 400L, quantity = 1L;
given(stockJpaRepository.existsById(productId)).willReturn(false);

given(orderDuplicationRepository.saveIfAbsent(orderId, Duration.ofSeconds(30L))).willReturn(true);
given(stockRedisRepository.isExisted(productId)).willReturn(false);
given(stockJpaRepository.findQuantityByProductId(productId)).willReturn(Optional.empty());

// expect
Expand Down
Loading
Loading