Skip to content

Latest commit

Β 

History

History
417 lines (316 loc) Β· 13 KB

File metadata and controls

417 lines (316 loc) Β· 13 KB

Kafka 적용 κ°œμ„ μ•ˆ λ³΄κ³ μ„œ

πŸ“‹ λͺ©μ°¨

  1. Kafka κ°œμš”
  2. 적용 배경 및 이유
  3. μ‹œμŠ€ν…œ ꡬ성
  4. μ£Όμš” κ°œμ„  사항
  5. 기술적 세뢀사항
  6. λͺ¨λ‹ˆν„°λ§ 및 운영
  7. ν–₯ν›„ ν™•μž₯ κ°€λŠ₯μ„±

πŸ” Kafka κ°œμš”

Kafkaλž€?

Apache KafkaλŠ” λΆ„μ‚° 이벀트 슀트리밍 ν”Œλž«νΌμœΌλ‘œ, λŒ€μš©λŸ‰μ˜ μ‹€μ‹œκ°„ 데이터λ₯Ό μ•ˆμ •μ μœΌλ‘œ μ²˜λ¦¬ν•  수 μžˆλŠ” λ©”μ‹œμ§€ λΈŒλ‘œμ»€μž…λ‹ˆλ‹€.

μ£Όμš” νŠΉμ§•

  • 높은 μ²˜λ¦¬λŸ‰: μ΄ˆλ‹Ή 수백만 개의 λ©”μ‹œμ§€ 처리 κ°€λŠ₯
  • ν™•μž₯μ„±: μˆ˜ν‰ ν™•μž₯을 ν†΅ν•œ ν΄λŸ¬μŠ€ν„° ꡬ성 지원
  • 내ꡬ성: λ””μŠ€ν¬ 기반 데이터 μ˜μ†μ„±μœΌλ‘œ λ©”μ‹œμ§€ μœ μ‹€ λ°©μ§€
  • λΆ„μ‚° 처리: νŒŒν‹°μ…”λ‹κ³Ό 볡제λ₯Ό ν†΅ν•œ κ³ κ°€μš©μ„± 보μž₯

핡심 κ°œλ…

κ°œλ… μ„€λͺ…
Producer λ©”μ‹œμ§€λ₯Ό Kafka 토픽에 λ°œν–‰ν•˜λŠ” 주체
Consumer Kafka ν† ν”½μ˜ λ©”μ‹œμ§€λ₯Ό κ΅¬λ…ν•˜μ—¬ μ†ŒλΉ„ν•˜λŠ” 주체
Topic λ©”μ‹œμ§€λ₯Ό λΆ„λ₯˜ν•˜λŠ” 논리적 채널
Partition 토픽을 λ‚˜λˆˆ 물리적 λ‹¨μœ„, 병렬 처리 κ°€λŠ₯
Broker Kafka μ„œλ²„ μΈμŠ€ν„΄μŠ€
Consumer Group ν˜‘λ ₯ν•˜μ—¬ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λŠ” Consumer κ·Έλ£Ή

πŸ’‘ 적용 λ°°κ²½ 및 이유

문제 상황

κΈ°μ‘΄ μ‹œμŠ€ν…œμ—μ„œλŠ” μ£Όλ¬Έ 결제 μ™„λ£Œ μ‹œ Spring Event 기반으둜 이벀트 λ¦¬μŠ€λ„ˆκ°€ μ™ΈλΆ€ λ‘œκΉ… μ‹œμŠ€ν…œμ„ 직접 ν˜ΈμΆœν•˜μ—¬ 이벀트λ₯Ό μ „μ†‘ν–ˆμŠ΅λ‹ˆλ‹€.

// κΈ°μ‘΄ 방식 (Spring Event + 직접 HTTP 호좜)
OrderService.processPayment()
  └─> Spring Event λ°œν–‰ (OrderPaidEvent)
      └─> OrderEventListener (@Async)
          └─> ExternalLoggingClient.sendLog()  // μ™ΈλΆ€ μ‹œμŠ€ν…œ 직접 HTTP 호좜
              β”œβ”€> 성곡: 정상 처리
              └─> μ‹€νŒ¨: 이벀트 μœ μ‹€ (둜그만 기둝)

문제점

  1. μ• ν”Œλ¦¬μΌ€μ΄μ…˜ λ‚΄λΆ€ κ²°ν•©: 동일 JVM λ‚΄λΆ€μ—μ„œλ§Œ λ™μž‘, λ‹€λ₯Έ μ„œλΉ„μŠ€λ‘œ ν™•μž₯ λΆˆκ°€
  2. κ°•ν•œ κ²°ν•©: 이벀트 λ¦¬μŠ€λ„ˆκ°€ μ™ΈλΆ€ λ‘œκΉ… μ‹œμŠ€ν…œκ³Ό 직접 μ—°κ²°
  3. λ©”μ‹œμ§€ μœ μ‹€ μœ„ν—˜: μ™ΈλΆ€ μ‹œμŠ€ν…œ μž₯μ•  μ‹œ 이벀트 손싀 (μž¬μ‹œλ„ μ—†μŒ)
  4. μž₯μ•  좔적 어렀움: μ‹€νŒ¨ν•œ μ΄λ²€νŠΈμ— λŒ€ν•œ 좔적 및 재처리 λΆˆκ°€

ν•΄κ²° λ°©μ•ˆ: Kafka λ„μž…

Kafkaλ₯Ό λ©”μ‹œμ§€ 브둜컀둜 λ„μž…ν•˜μ—¬ 이벀트 기반 μ•„ν‚€ν…μ²˜λ‘œ μ „ν™˜ν–ˆμŠ΅λ‹ˆλ‹€.

κΈ°λŒ€ 효과

효과 μ„€λͺ…
비동기 처리 μ£Όλ¬Έ μ²˜λ¦¬μ™€ λ‘œκΉ…μ΄ λΆ„λ¦¬λ˜μ–΄ 응닡 μ‹œκ°„ 단좕
μž₯μ•  격리 μ™ΈλΆ€ μ‹œμŠ€ν…œ μž₯μ• κ°€ λΉ„μ¦ˆλ‹ˆμŠ€ λ‘œμ§μ— 영ν–₯ μ—†μŒ
ν™•μž₯μ„± μƒˆλ‘œμš΄ Consumer μΆ”κ°€λ§ŒμœΌλ‘œ κΈ°λŠ₯ ν™•μž₯ (μ•Œλ¦Ό, 뢄석 λ“±)
μ‹ λ’°μ„± λ©”μ‹œμ§€ μ˜μ†μ„±κ³Ό μž¬μ‹œλ„λ‘œ 데이터 μœ μ‹€ λ°©μ§€

πŸ—οΈ μ‹œμŠ€ν…œ ꡬ성

Kafka 적용 ν”„λ‘œμ„ΈμŠ€ (결제 μ™„λ£Œ)

Kafka μ‹œν€€μŠ€ λ‹€μ΄μ–΄κ·Έλž¨

μ°Έκ³ : 쿠폰 λ°œκΈ‰μ—λŠ” kafka μ μš©ν•˜μ§€ μ•Šκ³  Redis Streams μ‚¬μš© μœ μ§€

  • 쿠폰 λ°œκΈ‰μ€ μ •ν•©μ„±κ³Ό 즉각적인 처리 속도가 더 μ€‘μš”ν•˜λ‹€. λ©”λͺ¨λ¦¬ 기반의 Redis Streams μ‚¬μš©μ„ μœ μ§€ν–ˆλ‹€.
  • 재처리 κ΅¬ν˜„λ„ ν•˜μ§€ μ•Šμ•˜λ‹€. λ‹¨λ°œμ μΈ 경쟁 처리이기 λ•Œλ¬Έμ— μž¬μ‹œλ„λŠ” μ˜λ―Έμ—†λ‹€κ³  νŒλ‹¨ν–ˆλ‹€.

ꡬ성 μš”μ†Œ

1. Producer μΈ‘ (μ£Όλ¬Έ μ„œλΉ„μŠ€)

OrderService
  └─> 결제 μ™„λ£Œ μ‹œ Spring Event λ°œν–‰ (νŠΈλžœμž­μ…˜ 컀밋 이후 μ‹€ν–‰)
      └─> OrderEventPublisher
          └─> OrderEventListener (비동기)
              └─> OrderKafkaProducer
                  └─> Kafka ν† ν”½ "order-paid-events"둜 λ°œν–‰

μ£Όμš” 클래슀:

  • OrderService: μ£Όλ¬Έ λΉ„μ¦ˆλ‹ˆμŠ€ 둜직 처리
  • OrderEventPublisher: Spring ApplicationEventPublisher ν™œμš©
  • OrderEventListener: @TransactionalEventListener ν™œμš©ν•˜μ—¬ νŠΈλžœμž­μ…˜ 컀밋 이후 μ‹€ν–‰
  • OrderKafkaProducer: Kafka Producer κ΅¬ν˜„
  • OrderKafkaMessage: λ©”μ‹œμ§€ DTO

2. Kafka ν΄λŸ¬μŠ€ν„°

브둜컀 ꡬ성:
  - localhost:19092 (broker-1)
  - localhost:29092 (broker-2)
  - localhost:39092 (broker-3)

ν† ν”½ μ„€μ •:
  - order-paid-events: νŒŒν‹°μ…˜ 3개, 볡제본 3개
  - order-paid-events.DLQ: Dead Letter Queue

3. Consumer μΈ‘ (λ‘œκΉ… μ„œλΉ„μŠ€)

Kafka ν† ν”½ "order-paid-events"
  └─> LoggingKafkaConsumer
      └─> λ©”μ‹œμ§€ μˆ˜μ‹  및 역직렬화
          └─> ExternalLoggingClient
              └─> μ™ΈλΆ€ λ‘œκΉ… μ‹œμŠ€ν…œ (http://localhost:3000/logs)

μ£Όμš” 클래슀:

  • LoggingKafkaConsumer: @KafkaListener ν™œμš©
  • ExternalLoggingClient: RestTemplate 기반 HTTP ν΄λΌμ΄μ–ΈνŠΈ

λ©”μ‹œμ§€ 포맷

{
  "eventType": "ORDER_PAID",
  "orderId": 123,
  "userId": 456,
  "paidAt": "2025-12-19T10:30:00",
  "orderItems": [
    {
      "productId": 1,
      "orderQuantity": 2
    }
  ]
}

✨ μ£Όμš” κ°œμ„  사항

1. Producer μž¬μ‹œλ„ 및 λ©±λ“±μ„± 보μž₯

μ„€μ • (application.properties):

spring.kafka.producer.acks=all                              # λͺ¨λ“  replica ACK λŒ€κΈ°
spring.kafka.producer.properties.enable.idempotence=true    # λ©±λ“±μ„± 보μž₯
spring.kafka.producer.retries=3                             # μ΅œλŒ€ 3회 μž¬μ‹œλ„
spring.kafka.producer.properties.retry.backoff.ms=1000      # μž¬μ‹œλ„ 간격 1초

효과:

  • μΌμ‹œμ  λ„€νŠΈμ›Œν¬ 였λ₯˜ μžλ™ 볡ꡬ
  • λ©”μ‹œμ§€ 쀑볡 λ°©μ§€ (λ©±λ“±μ„±)
  • λ©”μ‹œμ§€ μˆœμ„œ 보μž₯

2. Fallback DB μ €μž₯ λ©”μ»€λ‹ˆμ¦˜

Kafka λ°œν–‰ μ‹€νŒ¨ μ‹œ λ©”μ‹œμ§€λ₯Ό λ°μ΄ν„°λ² μ΄μŠ€μ— μ €μž₯ν•˜κ³ , 주기적으둜 μž¬λ°œν–‰μ„ μ‹œλ„ν•©λ‹ˆλ‹€.

μ•„ν‚€ν…μ²˜

OrderKafkaProducer
  └─> Kafka λ°œν–‰ μ‹œλ„
      β”œβ”€> [성곡] 둜그 기둝
      └─> [μ‹€νŒ¨] KafkaFallbackService
          └─> kafka_fallback_message ν…Œμ΄λΈ”μ— μ €μž₯
              └─> KafkaFallbackScheduler (1λΆ„λ§ˆλ‹€ μ‹€ν–‰)
                  └─> μž¬λ°œν–‰ μ‹œλ„
                      β”œβ”€> 성곡: status = PUBLISHED
                      └─> μ‹€νŒ¨: μ§€μˆ˜ λ°±μ˜€ν”„ ν›„ μž¬μ‹œλ„

μ£Όμš” μ»΄ν¬λ„ŒνŠΈ

KafkaFallbackMessage μ—”ν‹°ν‹°:

ν•„λ“œ μ„€λͺ…
topic Kafka ν† ν”½λͺ…
messageKey λ©”μ‹œμ§€ ν‚€
payload JSON λ©”μ‹œμ§€ λ³Έλ¬Έ
retryCount ν˜„μž¬ μž¬μ‹œλ„ 횟수
maxRetry μ΅œλŒ€ μž¬μ‹œλ„ 횟수 (κΈ°λ³Έ: 3)
status PENDING / PUBLISHED / FAILED
nextRetryAt λ‹€μŒ μž¬μ‹œλ„ μ‹œκ°
errorMessage μ‹€νŒ¨ μ‚¬μœ 

KafkaFallbackService:

  • saveFallbackMessage(): Fallback λ©”μ‹œμ§€ DB μ €μž₯
  • retryPendingMessages(): μž¬μ‹œλ„ λŒ€μƒ 쑰회 및 μž¬λ°œν–‰
  • getStats(): 톡계 쑰회 (PENDING, PUBLISHED, FAILED 건수)

KafkaFallbackScheduler:

  • 1λΆ„λ§ˆλ‹€ λŒ€κΈ° 쀑인 λ©”μ‹œμ§€ μž¬λ°œν–‰ μ‹œλ„
  • 1μ‹œκ°„λ§ˆλ‹€ 톡계 λ‘œκΉ…

μž¬μ‹œλ„ μ „λž΅

  • μ§€μˆ˜ λ°±μ˜€ν”„: 1λΆ„, 2λΆ„, 4λΆ„, 8λΆ„ (2^n λΆ„)
  • μ΅œλŒ€ μž¬μ‹œλ„: 3회
  • μ΅œμ’… μƒνƒœ: PUBLISHED (성곡) / FAILED (μ΅œλŒ€ μž¬μ‹œλ„ 초과 μ‹œ μˆ˜λ™ 처리 ν•„μš”)

3. Consumer μž¬μ‹œλ„ + DLQ (Dead Letter Queue)

Consumer 처리 μ‹€νŒ¨ μ‹œ μžλ™ μž¬μ‹œλ„ ν›„, μ΅œμ’… μ‹€νŒ¨ν•œ λ©”μ‹œμ§€λŠ” DLQ둜 μ „μ†‘ν•©λ‹ˆλ‹€.

μ„€μ • (KafkaConsumerConfig)

@Bean
public CommonErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
    // μ§€μˆ˜ λ°±μ˜€ν”„: 1초, 2초, 4초 (μ΅œλŒ€ 3회)
    ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
    backOff.setInitialInterval(1000);
    backOff.setMultiplier(2.0);
    backOff.setMaxInterval(10000);

    // DLQ Recoverer: μ‹€νŒ¨ λ©”μ‹œμ§€λ₯Ό [원본토픽].DLQ둜 전솑
    DeadLetterPublishingRecoverer recoverer =
        new DeadLetterPublishingRecoverer(kafkaTemplate, ...);

    return new DefaultErrorHandler(recoverer, backOff);
}

처리 흐름

LoggingKafkaConsumer
  └─> externalLoggingClient.sendLog()
      β”œβ”€> [성곡] ACK
      └─> [μ‹€νŒ¨] DefaultErrorHandler
          β”œβ”€> μž¬μ‹œλ„ 1 (1초 ν›„)
          β”œβ”€> μž¬μ‹œλ„ 2 (2초 ν›„)
          β”œβ”€> μž¬μ‹œλ„ 3 (4초 ν›„)
          └─> [μ΅œμ’… μ‹€νŒ¨]
              └─> DeadLetterPublishingRecoverer
                  └─> order-paid-events.DLQ ν† ν”½μœΌλ‘œ 전솑

λ³€κ²½ 사항

Before:

catch (Exception e) {
    // 둜그만 남기고 끝 (λ©”μ‹œμ§€ μœ μ‹€ κ°€λŠ₯)
    log.error("Failed to process...", e);
}

After:

// try-catch 제거 β†’ ErrorHandlerκ°€ μžλ™ 처리
externalLoggingClient.sendLog(message);

4. κ°œμ„  효과 μš”μ•½

ꡬ뢄 κΈ°μ‘΄ κ°œμ„  ν›„
λ©”μ‹œμ§€ μœ μ‹€ λ°œν–‰ μ‹€νŒ¨ μ‹œ μœ μ‹€ Fallback DB + μž¬λ°œν–‰μœΌλ‘œ λ°©μ§€
Producer μž¬μ‹œλ„ μ—†μŒ μžλ™ μž¬μ‹œλ„ 3회 + λ©±λ“±μ„± 보μž₯
Consumer μž¬μ‹œλ„ 둜그만 기둝 μžλ™ μž¬μ‹œλ„ 3회 + DLQ 전솑
μž₯μ•  격리 μ™ΈλΆ€ μ‹œμŠ€ν…œ μž₯μ• κ°€ 주문에 영ν–₯ Kafkaκ°€ 버퍼 μ—­ν• , μ™„μ „ 뢄리
λͺ¨λ‹ˆν„°λ§ 어렀움 Kafka UI + Fallback 톡계 + DLQ

πŸ”§ 기술적 세뢀사항

Producer μ„€μ •

# μ‹ λ’°μ„± μ„€μ •
spring.kafka.producer.acks=all                                    # λͺ¨λ“  replica 확인
spring.kafka.producer.properties.enable.idempotence=true          # 쀑볡 λ°©μ§€
spring.kafka.producer.retries=3                                   # μž¬μ‹œλ„ 3회
spring.kafka.producer.properties.retry.backoff.ms=1000            # μž¬μ‹œλ„ 간격
spring.kafka.producer.properties.request.timeout.ms=30000         # μš”μ²­ νƒ€μž„μ•„μ›ƒ
spring.kafka.producer.properties.delivery.timeout.ms=120000       # 전달 νƒ€μž„μ•„μ›ƒ
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5

Consumer μ„€μ •

# κΈ°λ³Έ μ„€μ •
spring.kafka.consumer.group-id=logging-consumer-group
spring.kafka.listener.concurrency=3                               # λ™μ‹œ 처리 μŠ€λ ˆλ“œ 3개

# μž¬μ‹œλ„ μ„€μ •
kafka.consumer.retry.max-attempts=3
kafka.consumer.retry.backoff.initial-interval=1000
kafka.consumer.retry.backoff.multiplier=2.0

ν† ν”½ μ„€μ •

kafka.topic.common.partitions=3                                   # νŒŒν‹°μ…˜ 3개
kafka.topic.common.replications=3                                 # 볡제본 3개
kafka.topic.order-paid=order-paid-events

μ£Όμš” 파일 ꡬ쑰

src/main/java/com/example/ecommerceapi/
β”œβ”€β”€ order/
β”‚   β”œβ”€β”€ application/
β”‚   β”‚   β”œβ”€β”€ service/OrderService.java
β”‚   β”‚   └── event/OrderEventPublisher.java
β”‚   β”œβ”€β”€ domain/
β”‚   β”‚   └── event/OrderPaidEvent.java
β”‚   └── infrastructure/
β”‚       └── kafka/
β”‚           β”œβ”€β”€ producer/OrderKafkaProducer.java
β”‚           └── dto/OrderKafkaMessage.java
β”œβ”€β”€ common/infrastructure/
    β”œβ”€β”€ kafka/
    β”‚   β”œβ”€β”€ config/
    β”‚   β”‚   β”œβ”€β”€ KafkaTopicConfig.java
    β”‚   β”‚   └── KafkaConsumerConfig.java
    β”‚   β”œβ”€β”€ consumer/LoggingKafkaConsumer.java
    β”‚   β”œβ”€β”€ entity/KafkaFallbackMessage.java
    β”‚   β”œβ”€β”€ repository/KafkaFallbackMessageRepository.java
    β”‚   β”œβ”€β”€ service/KafkaFallbackService.java
    β”‚   └── scheduler/KafkaFallbackScheduler.java
    └── external/
        β”œβ”€β”€ client/ExternalLoggingClient.java
        └── listener/ExternalLoggingEventListener.java

πŸ“Š λͺ¨λ‹ˆν„°λ§ 및 운영

1. Kafka UI (http://localhost:8081)

확인 ν•­λͺ©:

  • Topics: order-paid-events, order-paid-events.DLQ λ©”μ‹œμ§€ 확인
  • Consumer Groups: logging-consumer-group μƒνƒœ 및 Lag 확인
  • Brokers: ν΄λŸ¬μŠ€ν„° μƒνƒœ 확인

2. Fallback DB λͺ¨λ‹ˆν„°λ§

-- λŒ€κΈ° 쀑인 λ©”μ‹œμ§€ (μž¬λ°œν–‰ μ˜ˆμ •)
SELECT * FROM kafka_fallback_message WHERE status = 'PENDING';

-- μ‹€νŒ¨ν•œ λ©”μ‹œμ§€ (μˆ˜λ™ 처리 ν•„μš”)
SELECT * FROM kafka_fallback_message WHERE status = 'FAILED';

-- 톡계
SELECT status, COUNT(*)
FROM kafka_fallback_message
GROUP BY status;

3. 둜그 확인

# Fallback μ €μž₯ 둜그
grep "Saved fallback message" logs/application.log

# Fallback μž¬λ°œν–‰ 성곡
grep "Fallback message published successfully" logs/application.log

# DLQ 전솑 둜그
grep "Publishing failed message to DLQ" logs/application.log

# Consumer μž¬μ‹œλ„ 둜그
grep "Retrying message" logs/application.log

4. μŠ€μΌ€μ€„λŸ¬ 톡계 둜그

[2025-12-19 10:00:00] Fallback message stats - pending: 2, published: 45, failed: 1

πŸ“ κ²°λ‘ 

Kafka λ„μž…μ„ 톡해 이벀트 기반 μ•„ν‚€ν…μ²˜λ‘œ μ „ν™˜ν•˜κ³ , Fallback DB + DLQ λ©”μ»€λ‹ˆμ¦˜μ„ κ΅¬ν˜„ν•˜μ—¬ λ©”μ‹œμ§€ μœ μ‹€μ„ λ°©μ§€ν–ˆμŠ΅λ‹ˆλ‹€.

핡심 μ„±κ³Ό

  1. λΉ„μ¦ˆλ‹ˆμŠ€ 둜직과 μ™ΈλΆ€ μ‹œμŠ€ν…œ μ™„μ „ 뢄리 β†’ μž₯μ•  격리
  2. 3단계 μ•ˆμ „μž₯치 β†’ Producer μž¬μ‹œλ„ + Fallback DB + Consumer μž¬μ‹œλ„
  3. λ©”μ‹œμ§€ μœ μ‹€ 제둜 β†’ λͺ¨λ“  μ‹€νŒ¨ μΌ€μ΄μŠ€μ— λŒ€ν•œ λŒ€μ‘ μ™„λ£Œ
  4. ν™•μž₯ κ°€λŠ₯ν•œ μ•„ν‚€ν…μ²˜ β†’ μƒˆλ‘œμš΄ Consumer μΆ”κ°€λ§ŒμœΌλ‘œ κΈ°λŠ₯ ν™•μž₯

이λ₯Ό 톡해 μ•ˆμ •μ μ΄κ³  ν™•μž₯ κ°€λŠ₯ν•œ 이벀트 기반 μ‹œμŠ€ν…œμ„ κ΅¬μΆ•ν–ˆμŠ΅λ‹ˆλ‹€.