Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package transentia

import org.gradle.api.Plugin
import org.gradle.api.Project
import org.gradle.kotlin.dsl.configure
import org.gradle.kotlin.dsl.dependencies
import org.jetbrains.kotlin.gradle.dsl.KotlinJvmProjectExtension

class KafkaConfigConventionPlugin : Plugin<Project> {
override fun apply(target: Project) = with(target) {
Expand All @@ -12,6 +14,12 @@ class KafkaConfigConventionPlugin : Plugin<Project> {
pluginManager.apply("org.jetbrains.kotlin.kapt")
pluginManager.apply("org.jetbrains.kotlin.plugin.allopen")
pluginManager.apply("org.jetbrains.kotlin.plugin.spring")

// JVM 21 설정
extensions.configure<KotlinJvmProjectExtension> {
jvmToolchain(21)
}

// 필요한 최소 의존성만 추가
dependencies {
add("implementation", "org.springframework.boot:spring-boot-starter-validation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ class SpringBootAppConventionPlugin : Plugin<Project> {
add("implementation", "org.springframework.boot:spring-boot-starter-json")
add("implementation", "org.jetbrains.kotlin:kotlin-reflect")

// Observability - Actuator + Prometheus
// Observability - Actuator + Prometheus + Tracing
add("implementation", "org.springframework.boot:spring-boot-starter-actuator")
add("implementation", "io.micrometer:micrometer-registry-prometheus")
add("implementation", "io.micrometer:micrometer-tracing-bridge-otel")

add("testImplementation", "org.springframework.boot:spring-boot-starter-test")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
factory.containerProperties.apply {
pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
isObservationEnabled = true // 트레이싱 활성화
}

return factory
Expand Down
2 changes: 1 addition & 1 deletion infrastructure/kafka/kafka-producer/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ dependencies {
implementation(project(":kafka-config"))
implementation("org.apache.avro:avro:1.11.4")
implementation("io.confluent:kafka-avro-serializer:7.9.2")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class KafkaProducerConfig<K : Serializable, V : SpecificRecordBase>(

@Bean
fun kafkaTemplate(): KafkaTemplate<K, V> {
return KafkaTemplate(producerFactory())
return KafkaTemplate(producerFactory()).apply {
setObservationEnabled(true)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ class KafkaProducerImpl<K : Serializable, V : SpecificRecordBase>(
return kafkaTemplate.send(topicName, message)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package io.github.hyungkishin.transentia.infra.adapter.`in`.messaging

import com.fasterxml.jackson.databind.ObjectMapper
import io.github.hyungkishin.transentia.application.service.AnalyzeTransferService
import io.github.hyungkishin.transentia.infra.config.TracingTransformerSupplier
import io.github.hyungkishin.transentia.infra.event.TransferEventMapper
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventAvroModel
import org.apache.kafka.streams.kstream.KStream
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.function.Function
Expand All @@ -22,6 +24,7 @@ import java.util.function.Function
* 특징:
* - Stateless 처리 (이전 이벤트 참조 불필요)
* - 실시간 차단/리뷰 판정
* - TracingTransformer로 분산 트레이싱 컨텍스트 전파
*/
@Configuration
class TransferEventConsumer(
Expand All @@ -35,15 +38,16 @@ class TransferEventConsumer(
fun processTransferEvents(): Function<KStream<String, TransferEventAvroModel>, KStream<String, String>> {
return Function { input ->
input
.transformValues(TracingTransformerSupplier())
.peek { key, event ->
log.info(
"[FDS단일분석] 이벤트 수신 - key={} eventId={} accountId={} amount={}",
key, event.eventId, event.receiverId, event.amount
)
}
.mapValues { _, event ->
.mapValues { event ->
try {
// 1. Avro → Domain Event 변환
// 1. Domain Event 변환
val domainEvent = transferEventMapper.toDomain(event)

// 2. FDS 분석 실행 (Application Layer)
Expand Down Expand Up @@ -92,6 +96,9 @@ class TransferEventConsumer(
)

objectMapper.writeValueAsString(errorResult)
} finally {
// MDC 정리
MDC.clear()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.github.hyungkishin.transentia.infra.config

import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventAvroModel
import org.apache.kafka.streams.kstream.ValueTransformerWithKey
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier
import org.apache.kafka.streams.processor.ProcessorContext
import org.slf4j.MDC

/**
* Kafka Record 헤더에서 traceparent를 추출하여 MDC에 설정하는 Transformer
*
* Kafka Streams는 일반 KafkaConsumer와 달리 observationEnabled가 적용되지 않아서
* 수동으로 W3C Trace Context 헤더를 파싱해야 함
*
* traceparent 형식: {version}-{traceId}-{spanId}-{flags}
* 예: 00-130c0e23e150eb0ec69d4a4774cc1f03-47684cf7bc701ad3-01
*/
class TracingTransformer : ValueTransformerWithKey<String, TransferEventAvroModel, TransferEventAvroModel> {

private lateinit var context: ProcessorContext

override fun init(context: ProcessorContext) {
this.context = context
}

override fun transform(key: String?, value: TransferEventAvroModel): TransferEventAvroModel {
// 이전 MDC 정리
MDC.clear()

val headers = context.headers()
val traceparent = headers.lastHeader("traceparent")?.value()?.let { String(it) }

if (traceparent != null) {
// traceparent 형식: 00-{traceId}-{spanId}-{flags}
val parts = traceparent.split("-")
if (parts.size >= 3) {
MDC.put("traceId", parts[1])
MDC.put("spanId", parts[2])
}
}

return value
}

override fun close() {
MDC.clear()
}
}

/**
* TracingTransformer를 생성하는 Supplier
*/
class TracingTransformerSupplier : ValueTransformerWithKeySupplier<String, TransferEventAvroModel, TransferEventAvroModel> {
override fun get(): ValueTransformerWithKey<String, TransferEventAvroModel, TransferEventAvroModel> {
return TracingTransformer()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# ============================================
# Local 환경용 설정 (Docker 호스트명 → localhost)
# ============================================
spring:
kafka:
bootstrap-servers: localhost:9094
cloud:
stream:
kafka:
binder:
brokers: localhost:9094
streams:
binder:
brokers: localhost:9094
configuration:
schema.registry.url: http://localhost:8085

kafka-config:
bootstrap-servers: localhost:9094
schema-registry-url: http://localhost:8085
10 changes: 6 additions & 4 deletions services/fds/instances/api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ server:
spring:
application:
name: fds-service
autoconfigure:
exclude:
- org.springframework.boot.actuate.autoconfigure.tracing.BraveAutoConfiguration

# ============================================
# Spring Cloud Stream 설정
Expand Down Expand Up @@ -120,7 +123,6 @@ management:
tags:
application: ${spring.application.name}
tracing:
enabled: false
zipkin:
tracing:
endpoint: http://localhost:9999/disabled
enabled: true
sampling:
probability: 1.0 # 개발환경 100% 샘플링
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] [trace:%X{X-Trace-Id}] %logger{36} - %msg%n
</pattern>
<pattern>%d{HH:mm:ss.SSS} [%5level] [%X{traceId:-}] %logger{25} - %msg%n</pattern>
</encoder>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEvent
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferStatus
import io.github.hyungkishin.transentia.infrastructure.kafka.producer.service.KafkaProducer
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import java.time.Instant
import java.util.*

@Component
class KafkaTransferEventPublisher(
Expand Down Expand Up @@ -41,7 +39,6 @@ class KafkaTransferEventPublisher(
mapOf(
"eventType" to "TRANSFER_COMPLETED",
"eventVersion" to "v1",
"traceId" to (MDC.get("traceId") ?: UUID.randomUUID().toString()),
"producer" to "transfer-api",
"contentType" to "application/json"
)
Expand Down Expand Up @@ -79,7 +76,6 @@ class KafkaTransferEventPublisher(
mapOf(
"eventType" to "TRANSFER_COMPLETED",
"eventVersion" to "v1",
"traceId" to (MDC.get("traceId") ?: UUID.randomUUID().toString()),
"producer" to "transfer-api-fallback",
"contentType" to "application/json"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.github.hyungkishin.transentia.api.config

import org.slf4j.MDC
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.TaskDecorator
import org.springframework.core.task.support.ContextPropagatingTaskDecorator
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.Executor
Expand All @@ -24,28 +23,12 @@ class AsyncConfig {
executor.setWaitForTasksToCompleteOnShutdown(true)
executor.setAwaitTerminationSeconds(30)

// TaskDecorator 적용
executor.setTaskDecorator(mdcTaskDecorator())
// Spring Boot 3.0+ ContextPropagatingTaskDecorator
// MDC + Micrometer Observation Context 모두 전파
executor.setTaskDecorator(ContextPropagatingTaskDecorator())

executor.initialize()
return executor
}

// MDC 정보 전파를 위한 TaskDecorator
private fun mdcTaskDecorator(): TaskDecorator {
return TaskDecorator { runnable ->
val contextMap = MDC.getCopyOfContextMap()
Runnable {
try {
if (contextMap != null) {
MDC.setContextMap(contextMap)
}
runnable.run()
} finally {
MDC.clear()
}
}
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# ============================================
# Local 환경용 설정 (Docker 호스트명 → localhost)
# ============================================
kafka-config:
bootstrap-servers: localhost:9094
schema-registry-url: http://localhost:8085
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
spring:
application:
name: transfer-api
autoconfigure:
exclude:
- org.springframework.boot.actuate.autoconfigure.tracing.BraveAutoConfiguration
datasource:
url: jdbc:postgresql://localhost:5432/transfer
username: postgres
Expand Down Expand Up @@ -79,7 +82,6 @@ management:
tags:
application: ${spring.application.name}
tracing:
enabled: false
zipkin:
tracing:
endpoint: http://localhost:9999/disabled
enabled: true
sampling:
probability: 1.0 # 개발환경 100% 샘플링
Loading