diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 026cd93..61cf825 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -3,7 +3,7 @@ name: CD - Build & Deploy on: push: branches: - - feature/traces-drilldown + - feature/bulk-v3 # - main workflow_dispatch: diff --git a/.gitignore b/.gitignore index 352e7fc..4c9823c 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,9 @@ yarn-debug.log* yarn-error.log* pnpm-debug.log* lerna-debug.log* +# Ignore all Markdown files repo-wide… +*.md +# …but keep README.md files tracked +!README.md +# Keep project-specific ignores inside subdirectories (.gitignore within backend/, frontend/, etc.) # Keep project-specific ignores inside subdirectories (.gitignore within backend/, frontend/, etc.) diff --git a/AGENTS.md b/AGENTS.md index e52a796..a71bb27 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,7 +1,7 @@ # Repository Guidelines ## Project Structure & Module Organization -모노레포 루트에는 `backend`(주요 NestJS 모노앱), `producerServer`, `app_api`, `infra`, `k8s_userside_log_generator` 가 있습니다. 일상 개발은 `backend` 디렉터리에서 수행하며 `src/query-api`(읽기 전용 API), `src/stream-processor`(카프카 소비자), `src/shared`(공용 DTO·서비스)로 나뉩니다. 런타임 산출물은 `dist/`에 생성되고, 환경 전용 설정은 `.env`, `.env.local`, `infra/docker-compose.yml` 등에 위치합니다. 테스트 스위트는 `src/**/*.spec.ts`에 두고, 샘플 수집 스크립트는 `src/stream-processor/log-consumer/app` 하위에 있습니다. +모노레포 루트에는 `backend`(주요 NestJS 모노앱), `producerServer`, `app_api`, `infra` 가 있습니다. 일상 개발은 `backend` 디렉터리에서 수행하며 `src/query-api`(읽기 전용 API), `src/stream-processor`(카프카 소비자), `src/shared`(공용 DTO·서비스)로 나뉩니다. 런타임 산출물은 `dist/`에 생성되고, 환경 전용 설정은 `.env`, `.env.local`, `infra/docker-compose.yml` 등에 위치합니다. 테스트 스위트는 `src/**/*.spec.ts`에 두고, 샘플 수집 스크립트는 `src/stream-processor/log-consumer/app` 하위에 있습니다. ## Build, Test, and Development Commands 모든 명령은 `backend`에서 실행합니다. `npm run start:query-api`, `npm run start:log-consumer`, `npm run start:metrics-consumer` 는 각각의 Nest 엔트리를 ts-node로 부팅합니다. 프로덕션 빌드는 `npm run build` 또는 서비스별 `npm run build:query-api`, `npm run build:stream-processor` 로 생성합니다. 로컬 통합 확인 시 `docker compose -f infra/docker-compose.yml up kafka elasticsearch redis` 로 인프라를 띄운 뒤 애플리케이션을 npm 스크립트로 실행합니다. 샘플 이벤트는 `npm run test:app-log`·`npm run test:http-log` 로 발행합니다. diff --git a/backend/README.md b/backend/README.md index f0c0b4c..f8b3a1a 100644 --- a/backend/README.md +++ b/backend/README.md @@ -67,6 +67,17 @@ Kafka 컨슈머 처리량을 주기적으로 파악하고 싶다면 다음 환 | `STREAM_THROUGHPUT_MIN_INTERVAL_MS` | `10000` | 처리량 로그 사이의 최소 간격(ms). 너무 잦은 로깅을 방지합니다. | | `STREAM_THROUGHPUT_TARGET_COUNT` | _(옵션)_ | 총 N건 처리 완료까지의 예상 소요 시간을 로그에 함께 표시합니다. | +### Bulk 색인 버퍼 옵션 + +`apm.logs`/`apm.spans` 컨슈머는 Elasticsearch `_bulk` API로 배치 색인을 수행합니다. 아래 환경 변수를 통해 버퍼 크기와 플러시 동시성을 조정할 수 있습니다. + +| 변수 | 기본값 | 설명 | +| --- | --- | --- | +| `BULK_BATCH_SIZE` | `500` | 버퍼에 일정 건수 이상 쌓이면 즉시 flush 합니다. | +| `BULK_BATCH_BYTES_MB` | `5` | 문서 크기 합계가 지정한 MB를 넘기면 즉시 flush 합니다. | +| `BULK_FLUSH_INTERVAL_MS` | `1000` | 위 조건을 만족하지 않아도 해당 시간이 지나면 주기적으로 flush 합니다. | +| `BULK_MAX_PARALLEL_FLUSHES` | `1` | 동시에 실행할 bulk 요청 개수. 클러스터 부하에 맞게 1~4 사이에서 조정하세요. | + ### Query API 성능 프로파일링 서비스 메트릭 엔드포인트(`GET /services/{serviceName}/metrics`)가 Elasticsearch 집계를 수행하는데 걸린 시간을 확인하려면 `SERVICE_METRICS_PROFILE=true`를 설정하면 됩니다. diff --git a/backend/apm_bulk_design.md b/backend/apm_bulk_design.md new file mode 100644 index 0000000..7c61c46 --- /dev/null +++ b/backend/apm_bulk_design.md @@ -0,0 +1,198 @@ +# Design Specification: Bulk + Limited Concurrency Ingestion for Elasticsearch + +## 배경과 기존 문제 + +현행 APM 파이프라인은 Kafka에서 로그 메시지를 하나씩 읽어와 `index()` API로 Elasticsearch(E) 데이터스트림에 저장한다. 네트워크 왕복을 매 메시지마다 수행하고, 단건 `index()`가 끝날 때까지 기다린 뒤 다음 메시지를 처리하는 구조 때문에 초당 처리량이 낮고 병렬성이 사실상 없었다. 특히 수십만 건 이상의 로그를 처리할 때 소비자 스레드가 몇 시간 동안 대기하는 현상이 발생하였다. Elastic API 문서는 대규모 데이터 적재 시 단건 `index()` 대신 Bulk API를 사용해야 네트워크 오버헤드를 줄이고 색인 속도를 높일 수 있다고 강조한다【102369654029175†L753-L762】. 그리고 적절한 배치 크기를 찾기 위해 실험이 필요하며, HTTP 요청은 100 MB를 넘지 않도록 주의해야 한다【102369654029175†L753-L757】. 또한 다른 사용자들도 네트워크 왕복을 아끼기 위해 Bulk API를 병렬로 실행하면서 적당한 청크 크기(1 MB–10 MB)와 적정 동시 플러시 수를 찾아야 한다고 조언한다【750595237790637†L52-L61】. + +본 문서는 이러한 문제를 해결하고, **대량 로그(수십만~수백만 건)도 빠르게 색인하면서 at‑least‑once 보장**을 유지하는 아키텍처를 설계한다. Node (NestJS) + Kafka + Elasticsearch 환경을 대상으로 하지만, 설계 개념은 다른 언어와 프레임워크에도 적용 가능하다. + +## 목표 + +- Kafka에서 들어오는 로그를 **at‑least‑once**로 Elasticsearch에 저장하고, 색인 실패 시 재처리를 지원한다. +- **Bulk API**를 사용하여 단건 요청 대비 색인 속도를 크게 높인다. Medium 기사에서도 bulk indexing이 단건 저장보다 훨씬 빠르다고 강조한다【85091753486274†L114-L120】. Python 테스트에서도 1000개 문서를 단건으로 index 하는 데 124 초가 걸린 반면, `streaming_bulk`는 0.67 초, `parallel_bulk`는 0.46 초만에 끝났다【596974854923973†L30-L124】. +- 과도한 동시성으로 Elasticsearch 클러스터를 과부하하지 않도록 **동시 플러시 수를 제한**한다. 실제 사례에서도 32개의 동시 프로세스로 색인하던 것을 1개 프로세스로 줄이자 클러스터 지표가 크게 개선되었다【901591924758741†L125-L143】. +- Kafka consumer가 색인을 기다리지 않고 무제한으로 enqueue하여 메모리를 고갈시키는 일을 방지하기 위해 **back‑pressure** 메커니즘을 도입한다. +- 로그 데이터 변환/검증 과정과 색인 과정의 책임을 분리하여 재사용성을 높이고, 서비스 코드와 분리된 Bulk 인덱싱 모듈을 제공한다. + +## 아키텍처 개요 + +### 데이터 흐름 + +``` +Kafka (topic: apm.logs) ──> NestJS consumer ──> DTO 변환/검증 ──> BulkIndexer (버퍼/플러시) ──> Elasticsearch 데이터스트림 +``` + +![Bulk ingestion flow]({{file:file-3hAzDofY4oYDp2MfohfxJ2}}) + +1. **Kafka consumer**는 메시지를 받아 DTO 변환 및 필수 검증을 수행한다. 검증에서 오류가 발생하면 해당 메시지를 건너뛰고 Kafka offset을 커밋한다. +2. DTO를 Elasticsearch 문서 구조(`LogDocument`)로 매핑한 뒤 **BulkIndexer**에 enqueue 한다. 이 단계에서는 실제 색인 작업을 수행하지 않고, 메모리 버퍼에 문서와 offset 정보를 저장한다. enqueue 호출은 `Promise`를 반환하며, 문서가 색인되면 resolve되고 실패 시 reject된다. +3. **BulkIndexer**는 배치 크기 또는 시간 조건이 만족될 때 버퍼에 쌓인 문서를 `_bulk` API로 전송한다. 클라이언트는 `Content‑Type: application/x‑ndjson`를 사용하여 NDJSON 포맷으로 전송해야 한다는 Elastic 문서를 준수한다【102369654029175†L738-L752】. +4. Flush 결과를 보고 개별 문서의 `resolve/reject`를 호출한다. 한 아이템이라도 실패하면 단순화된 전략으로 모든 문서를 실패로 취급하여 Kafka 재처리를 유도한다. 이로써 at‑least‑once 보장을 유지하지만 중복 삽입 가능성을 인정한다. +5. Consumer handler는 enqueue 반환값을 `await`하므로, 문서가 성공적으로 색인돼야만 offset을 커밋한다. 실패 시 예외를 발생시켜 Kafka consumer가 재시도하게 한다. + +### BulkIndexer 버퍼 설계 + +BulkIndexer는 싱글톤 서비스로서 다음과 같은 상태를 가진다: + +- `buffer: Array` — 각 항목은 `document`, Kafka offset 메타데이터, `resolve`/`reject` 콜백을 포함한다. +- `flushTimer: NodeJS.Timeout | null` — 시간 기반 플러시 예약용 타이머. +- `inFlightFlushes: number` — 현재 진행 중인 flush의 수. `MAX_PARALLEL_FLUSHES` 한도를 초과하면 새 flush를 미뤄서 클러스터 과부하를 방지한다. + +#### enqueue 동작 + +`enqueue(document, offsetInfo): Promise`는 다음과 같이 동작한다. + +1. `Promise`를 생성하고, `resolve`/`reject`를 `BufferedItem`에 저장한다. +2. `buffer`에 아이템을 추가한다. +3. 버퍼 길이가 `BATCH_SIZE` 이상이면 즉시 `flush()`를 시도한다. `flush()`는 `inFlightFlushes < MAX_PARALLEL_FLUSHES`일 때만 실행되며, 그렇지 않으면 다음 enqueue 또는 timer에서 재시도한다. +4. `flushTimer`가 설정되어 있지 않으면 `setTimeout(flush, FLUSH_INTERVAL_MS)`를 등록하여 일정 시간이 지나도 남아있는 문서를 flush할 수 있게 한다. +5. `Promise`를 반환하여 consumer handler가 `await`할 수 있도록 한다. + +#### flush 동작 + +`flush()`는 버퍼에서 현재 쌓여있는 항목을 모두 꺼내 하나의 bulk 요청으로 전송한다. + +- 메모리 버퍼는 매 flush 후 비워진다. 다른 메시지가 들어오면 새 버퍼에서 다시 쌓인다. +- `_bulk` 요청은 NDJSON 형식으로 `{ "index" : {"_index": data_stream} }\n{document}\n` 패턴으로 구성한다. +- HTTP 요청은 100 MB를 넘지 않도록 해야 한다는 Elastic 문서를 준수한다【102369654029175†L753-L757】. 배치 크기 기준은 문서 수(`BATCH_SIZE`) 또는 추정 바이트 수(예: 5 MB)를 동시에 고려할 수 있다. +- 요청 결과에서 `errors` 필드가 `true`이면 **간단한 전략으로 전체 배치를 실패로 간주**한다. Jörg Prante의 조언처럼 하나의 bulk 내 아이템들이 실패하면 동시에 재시도하는 것이 구현을 단순화한다【750595237790637†L52-L61】. +- 성공 시 각 아이템의 `resolve()`를 호출하여 consumer handler가 정상적으로 offset을 커밋할 수 있도록 한다. 실패 시 `reject()`를 호출하여 예외를 propagate 하고 Kafka에서 재처리한다. + +#### 플러시 조건 + +- `BATCH_SIZE` (문서 개수 기준): 500 ~ 1000개부터 시작하여 환경에 따라 조정한다. Elastic 커뮤니티에서는 1 MB ~ 10 MB 정도의 bulk 크기를 권장하며, 너무 큰 청크(예: 64 MB 이상)는 GC나 heap 부하를 유발할 수 있다고 한다【750595237790637†L52-L61】. +- `FLUSH_INTERVAL_MS` (시간 기준): 배치 크기에 도달하지 않더라도 일정 시간(예: 1000 ms) 이상 버퍼에 머무는 문서가 있으면 flush한다. 낮은 트래픽 환경에서 문서가 너무 오래 지연되는 것을 막는다. +- `MAX_PARALLEL_FLUSHES`: 동시에 몇 개의 bulk 요청을 허용할지 제어한다. 값이 1이면 한 번에 하나만 flush하고, 나머지는 버퍼에 대기한다. 높은 동시성을 주려면 2 ~ 4까지 늘리고, 클러스터에서 429 (too many requests) 에러가 발생하면 줄인다【85091753486274†L118-L129】. + +### Kafka offset 커밋 전략 + +NestJS Kafka 마이크로서비스는 handler가 `Promise`를 반환하고 예외를 발생시키는지 여부를 기준으로 offset을 커밋하거나 재처리한다. 따라서 bulk flush 결과가 성공했을 때만 enqueue에서 반환된 `Promise`를 resolve하고, 메시지 처리 함수(`handleLogEvent`)는 `await`를 통해 색인 완료를 보장해야 한다. 실패 시 reject하여 예외가 전파되면 offset이 커밋되지 않고 Kafka가 재전송한다. 이 방식은 **at‑least‑once** 보장을 유지하는 동시에 기본 NestJS API를 변경하지 않아 구현이 간단하다. + +### 오류 처리 전략 + +1. **DTO 검증 오류**: 현재 구조처럼 `InvalidLogEventError`를 구분하여 메시지를 skip 하고 성공적으로 offset을 커밋한다. 잘못된 이벤트는 재시도해도 성공할 가능성이 없으므로 유실을 허용한다. +2. **bulk 전체 실패(네트워크 오류 등)**: flush한 모든 문서를 실패로 간주하고 각 `reject`를 호출한다. Kafka에서는 재처리되어 중복 색인이 발생할 수 있으나 로그 데이터는 idempotent하지 않아도 되므로 허용한다. +3. **bulk 부분 실패**: Elasticsearch 응답에서 `errors`가 `true`일 때 성공/실패 항목을 따로 구분할 수 있다. 구현이 복잡해지므로 초기에는 **전체 배치를 실패**로 간주하는 단순 전략을 사용한다. 향후 요구 사항에 따라 실패한 문서만 재처리하거나 별도의 DLQ(Dead Letter Queue)를 도입할 수 있다. + +### 동시성 및 back‑pressure + +노드 프로세스는 이벤트 루프를 사용하므로 무제한으로 I/O를 발행할 경우 메모리나 소켓 수가 고갈될 수 있다. 따라서 BulkIndexer는 `MAX_PARALLEL_FLUSHES`를 통해 한 번에 수행할 flush 수를 제한한다. 버퍼에 데이터가 넘치면 Kafka consumer는 자연스럽게 `await enqueue` 호출에서 대기하게 되어 back‑pressure가 형성된다. 이를 통해 consumer가 처리 가능한 속도 이상으로 메시지를 읽어오지 않게 된다. 실제 사례에서도 색인 동시성을 32에서 1로 줄이자 클러스터 CPU와 검색 지연이 크게 줄었다【901591924758741†L125-L143】. + +#### 다중 작업자(Workers) 사용과 429 대응 + +Elastic Docs는 단일 스레드가 bulk 요청을 보내서는 클러스터 자원을 모두 활용할 수 없다고 설명한다【389495691530842†L924-L931】. 따라서 여러 스레드나 프로세스를 사용해 동시적으로 bulk 요청을 보내면 I/O 비용을 나누고 전체 처리량을 높일 수 있다. 그러나 너무 많은 병렬 작업자는 클러스터의 메모리와 CPU를 고갈시켜 `TOO_MANY_REQUESTS (429)` 오류를 초래할 수 있다【389495691530842†L932-L941】. 권장 사항은 다음과 같다: + +* **복수 작업자/스레드 사용**: 한 스레드만으로는 클러스터를 포화시키기 어렵다. 여러 작업자를 통해 병렬로 bulk 요청을 전송해 전체 throughput을 높인다【389495691530842†L924-L931】. +* **적정 동시성 찾기**: 적정한 작업자 수는 환경마다 다르므로 실험적으로 찾아야 한다. 한 번에 작업자 수를 조금씩 늘리면서 CPU나 디스크 I/O가 포화되는 지점을 확인한다【389495691530842†L943-L945】. +* **429 오류 감지와 백오프**: 클러스터가 과부하되면 `EsRejectedExecutionException`을 통해 429 코드를 반환한다【389495691530842†L937-L941】. 이때는 indexer가 잠시 대기 후 재시도하는 지수 백오프 로직을 적용해야 한다【389495691530842†L937-L941】. 앞서 언급한 Medium 기사에서도 429 오류가 발생할 때는 임의의 지연으로 확장 지수 백오프를 쓰라고 권장한다【85091753486274†L118-L129】. + +이러한 지침을 따라 `MAX_PARALLEL_FLUSHES`와 Kafka consumer 인스턴스 수를 조정하면 클러스터 리소스를 적절히 활용할 수 있다. 너무 적으면 쓰기 throughput이 낮아지고, 너무 많으면 429 오류와 지연이 발생한다. 따라서 모니터링 도구를 활용해 적절한 지점을 찾아야 한다. + +## Elasticsearch 설정 가이드 + +Bulk 인덱싱 성능은 클라이언트 설계뿐 아니라 Elasticsearch 클러스터 설정에도 크게 좌우된다. + +### Bulk API와 배치 크기 + +- Elastic 문서는 bulk 요청에 정답이 되는 문서 수가 없으며, 시스템 환경에 맞는 최적값을 찾아야 한다고 설명한다【102369654029175†L753-L755】. 요청의 크기는 100 MB 이하로 제한되어야 한다【102369654029175†L753-L757】. +- Elastic 커뮤니티의 조언에 따르면, 1 MB~10 MB 또는 500 ~ 1000개의 문서를 한 배치로 보내는 것이 일반적이다【750595237790637†L52-L61】. 너무 큰 배치(수십 MB 이상)는 GC와 heap 부하를 유발하고, 너무 작은 배치는 네트워크 왕복이 많아진다. + +### Refresh interval 조정 + +- 색인된 문서를 검색에서 볼 수 있도록 만드는 **refresh** 작업은 비용이 크며, 지나치게 자주 호출하면 색인 속도가 저하된다. Elastic Docs는 대용량 bulk 작업을 수행할 때 `index.refresh_interval`을 `-1`로 설정하여 refresh를 비활성화한 뒤, 작업 완료 후 다시 원하는 값으로 설정할 것을 권장한다【389495691530842†L947-L986】. refresh를 비활성화하면 색인 중에는 문서가 검색되지 않지만, 로그 파이프라인은 약간의 지연을 허용하므로 적합하다. +- 기본값(1 초 refresh)은 색인량이 적고 검색량이 적을 때 최적이지만, 정기적인 검색 요청이 있는 경우 refresh interval을 30 초로 늘리면 색인 성능이 개선될 수 있다【389495691530842†L953-L966】. + +### Replica 비활성화 + +- 대량 초기 적재 시 `index.number_of_replicas`를 0으로 설정하여 색인 성능을 높일 수 있다. Elastic Docs는 초기 로드를 빠르게 끝낸 뒤 다시 원래 값으로 되돌릴 것을 권장한다【389495691530842†L1011-L1018】. 다만 replica를 0으로 설정하면 노드 장애 시 데이터 손실 위험이 있으므로 외부 저장소나 Kafka에 데이터가 안전하게 남아있어야 한다. + +### 기타 튜닝 사항 + +- **index buffer size**와 **translog flush threshold**: 노드가 heavy indexing만 수행한다면 `indices.memory.index_buffer_size`를 충분히 크게(예: 최소 512 MB / shard) 설정하여 버퍼가 너무 자주 flush되지 않도록 한다【85091753486274†L241-L266】. translog의 `flush_threshold_size`를 늘려서 디스크 flush 빈도를 줄이면 성능이 향상될 수 있다【85091753486274†L241-L266】. +- **Auto‑generated IDs**: 자체 아이디를 지정하면 Elasticsearch가 기존 문서를 조회하여 중복 여부를 확인해야 하므로 느려질 수 있다. Elastic Docs는 auto‑generated id를 사용할 때 색인 성능이 향상된다고 설명한다【389495691530842†L1054-L1060】. 하지만 로그 데이터는 추적을 위해 trace id나 timestamp를 key로 사용할 수도 있으며, 이 경우 id 조회 비용을 감수하거나 idempotent 파이프라인을 설계해야 한다. +- **Refresh interval과 replica 변경 이후 복구**: bulk 적재가 끝나면 refresh interval을 원래 값으로, replica를 원래 개수로 되돌린 뒤 필요하다면 `_forcemerge`를 수행하여 검색 성능을 최적화한다【389495691530842†L947-L1003】. + +## Node / NestJS 구현 지침 + +### BulkIndexer 서비스 인터페이스 + +```ts +interface OffsetInfo { + topic: string; + partition: number; + offset: string; +} + +interface BulkIndexer { + enqueue(document: LogDocument, offsetInfo: OffsetInfo): Promise; + flush(): Promise; +} +``` + +BulkIndexer를 NestJS의 provider로 등록하여 전체 프로세스에서 하나의 인스턴스를 사용한다. `enqueue()`는 위에서 설명한 버퍼 로직을 구현하고, `flush()`는 현재 버퍼를 `_bulk` API로 전송한다. flush는 `MAX_PARALLEL_FLUSHES`를 넘지 않는 범위에서 실행된다. + +### 서비스 연동 예시 흐름 + +1. **컨슈머** (`handleLogEvent`): DTO 파싱, 검증 후 `logIngestService.ingest(dto)`를 호출한다. +2. **logIngestService**: DTO를 Elasticsearch 문서로 변환하고 `bulkIndexer.enqueue(document, offsetInfo)`를 호출한다. 반환되는 `Promise`를 `await`하여 색인 완료를 기다린다. +3. **BulkIndexer**: 내부 버퍼에 데이터와 offset을 저장하고 플러시 조건을 점검한다. +4. **Kafka consumer config**: `eachMessage`/`handleLogEvent`에서 발생한 예외는 NestJS Kafka가 재시도를 처리하게 한다. `autoCommit: false` 또는 기본 자동 커밋 설정과 함께 at‑least‑once를 유지할 수 있도록 NestJS handler 패턴을 그대로 사용한다. + +### 제한된 동시성 구현 + +`MAX_PARALLEL_FLUSHES` 값을 통해 몇 개의 bulk 요청을 동시에 처리할지 제어한다. 예를 들어 1로 두면 항상 하나씩 flush하고, 2 이상으로 올리면 Elastic 클러스터의 thread pool과 대기열이 허용하는 범위까지 동시 요청을 늘린다. 실험을 통해 적정 값을 찾아야 한다. Elastic 커뮤니티에서는 먼저 1 MB batch / 1 thread, 다음 2 MB / 1 thread, 다음 2 MB / 2 threads 등으로 조합을 실험해 보고 throughput이 떨어지는 지점을 찾으라고 조언한다【750595237790637†L52-L61】. + +### 배치 크기 추정 + +Batch size를 문서 개수와 바이트 수 기준으로 동시에 제어하는 것이 이상적이다. 각 문서의 JSON 문자열 길이(또는 Buffer 길이)를 측정하여 누적 크기를 추적하고, 5 MB ~ 10 MB를 넘지 않도록 flush한다. Jörg Prante는 64 MB bulk가 너무 크며 GC 지연을 유발할 수 있다고 지적했다【750595237790637†L52-L61】. + +### 에러·재시도 정책 + +Elasticsearch `_bulk` API는 응답에 `items` 배열을 포함하여 각 작업의 성공/실패 여부를 알려준다. 단순한 첫 구현에서는 **하나의 문서라도 실패하면 전체 배치 실패**로 처리하여 메시지를 재수신하도록 한다. 향후에는 부분 실패 문서만 재시도하는 로직을 추가하거나 Kafka DLQ(Dead Letter Queue)를 도입할 수 있다. 클러스터가 과부하되어 `429 Too Many Requests`가 발생하는 경우에는 bulkIndexer가 **지수 백오프**를 적용하여 재시도할 수 있다【85091753486274†L118-L129】. + +### DTO 검증 최적화 + +대량 ingest 시 class‑validator 기반 DTO 검증이 병목이 될 수 있다. 필수 필드만 간단히 체크하여 검증 속도를 높이고, 복잡한 검증은 API 단이나 별도의 경로에서 수행하는 것이 좋다. 적재 모드에서는 debug 로그를 문서마다 찍지 말고 일정 건수마다 집계 로그를 찍어 I/O 오버헤드를 줄인다. + +### 멀티 파티션·병렬 컨슈머 + +Kafka topic의 파티션 수를 늘리고 동일한 consumer group으로 여러 인스턴스를 실행하면 파티션 단위로 workload가 분산된다. BulkIndexer는 프로세스 내부에서만 상태를 공유하므로 각 컨슈머 인스턴스에 하나씩 생성된다. 클러스터 전체를 고려할 때, BulkIndexer의 동시 flush 수(`MAX_PARALLEL_FLUSHES`) 곱하기 인스턴스 수가 Elasticsearch cluster의 쓰기 스레드 풀을 넘지 않도록 주의한다. 클러스터가 429 에러를 반환하면 consumer 인스턴스 수 또는 flush 동시성을 줄이고, 지수 백오프를 도입하여 쓰기 속도를 조절한다. + +## 벤치마킹과 튜닝 + +### 초기 파라미터 제안 + +| 파라미터 | 초기값 | 설명 | +|---|---|---| +| `BATCH_SIZE` | 500 ~ 1000 | 한 batch당 문서 수. 작은 값은 네트워크 오버헤드가 크고, 너무 큰 값은 ES heap/GC에 부담을 준다【750595237790637†L52-L61】. | +| `BATCH_BYTE_LIMIT` | 5 MB | 문서 크기를 합산하여 5 MB를 넘기 전에 flush. Elastic 커뮤니티에서 1 MB ~ 10 MB 사이를 권장함【750595237790637†L52-L61】. | +| `FLUSH_INTERVAL_MS` | 1000 ms | 문서가 적게 들어오는 상황에서 1 초마다 flush하여 지연을 줄인다. | +| `MAX_PARALLEL_FLUSHES` | 1 | 한 번에 하나의 bulk 요청을 수행하여 클러스터 과부하를 방지하고 관찰하기 쉽게 한다. 필요시 2 ~ 4까지 늘려 실험한다【901591924758741†L125-L143】. | + +### 튜닝 방법 + +1. **배치 크기 조정**: 모니터링 도구(Kibana, Elastic APM 등)를 통해 bulk 요청의 처리 시간(`took`), 성공률, CPU 사용량을 관찰한다. 1 MB batch / 1 thread부터 시작해서 조금씩 크기나 동시성을 늘린 뒤 throughput이 떨어지는 지점을 찾는다【750595237790637†L52-L61】. +2. **Refresh interval/replica 변경**: 대량 적재를 진행하기 직전 해당 인덱스의 `refresh_interval`을 `-1`로 변경하고, `number_of_replicas`를 0으로 변경한다. 적재가 끝나면 원래 값으로 복구하고 필요 시 force merge를 수행한다【389495691530842†L947-L1003】. +3. **동시성 조정**: bulk flush의 동시에 실행되는 개수를 늘리면 throughput이 증가할 수 있으나, `429 Too Many Requests`가 나타나거나 검색 지연이 급증하면 값을 줄여야 한다. People.ai 사례에서는 동시 프로세스를 32에서 1로 줄여 클러스터 부하가 줄었다【901591924758741†L125-L143】. 용량이 충분한 클러스터에서는 2 ~ 4까지 늘려볼 수 있다. +4. **지수 백오프**: flush가 실패하거나 `429`를 받으면 1 초, 2 초, 4 초 등 점점 길게 대기한 뒤 재시도하여 클러스터에 과도한 부하를 주지 않도록 한다【85091753486274†L118-L129】. +5. **문서 설계 최적화**: 불필요하게 큰 문서나 깊은 중첩을 피하고, 필요하지 않은 필드는 매핑에서 제외한다. Search Guard 블로그에서도 인덱싱 성능을 높이려면 불필요한 필드를 줄이고 문서 크기를 최소화해야 한다고 권고한다【71945213822368†L100-L110】. + +## 추가 고려 사항 + +### 인덱스 롤오버 및 시간 기반 설계 + +로그와 APM 데이터는 시간이 지남에 따라 방대해지므로 **데이터스트림 + ILM(인덱스 라이프사이클 관리)**를 사용하여 새로운 세그먼트를 주기적으로 롤오버하고 오래된 세그먼트를 삭제하는 것이 좋다. 롤오버 간격(예: 하루, 10GB 등)을 정의하여 각 데이터스트림이 적당한 크기를 유지하도록 한다. 이는 클러스터의 merge 부하를 줄이고 검색 성능을 안정적으로 유지한다. + +### 롤업/집계 파이프라인 분리 + +1분 단위 버킷 카운트 같은 집계 작업은 ingest 경로에서 수행하지 말고, 별도의 후처리 파이프라인(ES Transform, Aggregation API + cron job, Redis 카운터 등)으로 분리하여 색인 성능에 영향을 주지 않도록 한다. 같은 컨슈머 경로에서 집계까지 수행하면 per-message 처리 시간이 길어져 throughput을 크게 떨어뜨린다. + +### 하드웨어 및 운영 환경 + +- SSD 디스크 사용, 충분한 RAM, 그리고 Elasticsearch heap 사이즈를 전체 메모리의 50% 이하로 설정해 filesystem cache를 활용하는 것이 좋다【71945213822368†L66-L72】. 또한 swap을 비활성화하고 JVM heap 사이즈를 적절히 설정해야 한다【389495691530842†L1024-L1037】. +- Translog 크기(`flush_threshold_size`)와 인덱싱 버퍼를 조절해 flush/merge 빈도를 줄이면 성능이 향상될 수 있다【85091753486274†L241-L266】. +- Node 애플리케이션에서 CPU 집약적인 로직(class-validator, JSON 변환 등)을 줄이고, 로그 출력을 최소화해 I/O 부하를 줄인다. + +## 결론 + +Bulk API와 제한된 동시성 전략을 통해 APM 로그 파이프라인의 색인 속도를 **수십 배 이상** 향상시킬 수 있다. 핵심은 메시지를 메모리 버퍼에 모았다가 일정 크기·시간마다 `_bulk` 요청으로 묶어 보내는 **BulkIndexer**를 도입하고, 동시에 플러시되는 요청 수를 제어하여 Elasticsearch 클러스터를 과부하하지 않는 것이다. Elastic 문서는 적절한 배치 크기와 refresh interval 조정을 통해 색인 성능을 최적화할 수 있다고 강조한다【389495691530842†L947-L986】, 【102369654029175†L753-L757】. 커뮤니티 경험도 1 MB~10 MB 정도의 청크와 실험을 통한 sweet spot 찾기를 권장하며, replica를 0으로 두고 refresh를 비활성화하면 초기 적재 속도를 크게 높일 수 있다고 말한다【750595237790637†L52-L61】. 이러한 설계를 코드화하면 단건 `index()` 기반 구조를 리팩터링하여 수십만 건의 로그도 수 분 이내에 색인할 수 있을 것이다. \ No newline at end of file diff --git a/backend/bulk_indexer_backpressure_fix.md b/backend/bulk_indexer_backpressure_fix.md new file mode 100644 index 0000000..047b4f9 --- /dev/null +++ b/backend/bulk_indexer_backpressure_fix.md @@ -0,0 +1,610 @@ +# Kafka + BulkIndexer 병목 원인 분석 및 리팩토링 명세 + +본 문서는 **APM 로그 컨슈머에서 bulk 인덱싱으로 전환한 이후 성능이 기존 단건 index 대비 크게 떨어진 문제**의 원인을 정리하고, +그에 대한 **구체적인 코드 리팩토링 가이드**를 제공한다. 이 문서를 Codex에 넘겨서 자동으로 리팩토링할 수 있도록 +**의도·배경·변경 포인트·코드 예시**를 모두 포함한다. + +--- + +## 1. 현상 요약 + +- Kafka 토픽에 약 **100만 개 로그 메시지**가 적재되어 있음. +- 컨슈머 lag는 **100만 개 모두 소비 완료**로 표시됨. +- 하지만 **Elasticsearch 실제 인덱싱 속도는 분당 ~60건 수준**으로 매우 느림. +- bulk 도입 전, 각 메시지마다 `client.index()`로 단건 인덱싱할 때는 **분당 ~7,000건** 수준의 처리량이 나왔음. +- 즉, **bulk 인덱싱 도입 이후 소비는 잘 되는데 색인은 거의 안 되는 상황**. + +이 문제는 Kafka 컨슈머 쪽의 **back-pressure 설계와 BulkIndexer의 Promise 사용 방식**이 겹치면서, +**“파티션당 1초에 1건” 수준으로 소비가 직렬화되어 버린 것**이 근본 원인이다. + +--- + +## 2. 현재 처리 흐름 정리 + +### 2.1 Kafka → LogConsumerController + +```ts +@EventPattern(process.env.KAFKA_APM_LOG_TOPIC ?? "apm.logs") +async handleLogEvent(@Ctx() context: KafkaContext): Promise { + const value = context.getMessage().value; + if (value == null) { + this.logger.warn("Kafka 메시지에 본문이 없어 처리를 건너뜁니다."); + return; + } + + try { + const dto = this.parsePayload(value); + await this.logIngestService.ingest(dto); // (1) 여기서 bulk enqueue를 await + await this.errorLogForwarder.forward(dto); // (2) 에러 로그 포워딩 + this.throughputTracker.markProcessed(); + ... + } catch (error) { + ... + throw error; + } +} +``` + +핵심 포인트: `handleLogEvent`는 **`logIngestService.ingest()`를 await** 한다. + +--- + +### 2.2 LogIngestService (Kafka DTO → ES 문서) + +```ts +@Injectable() +export class LogIngestService { + private static readonly STREAM_KEY: LogStreamKey = "apmLogs"; + + constructor(private readonly bulkIndexer: BulkIndexerService) {} + + async ingest(dto: LogEventDto): Promise { + const document: LogDocument = { + "@timestamp": this.resolveTimestamp(dto.timestamp), + type: "log", + service_name: dto.service_name, + environment: dto.environment, + trace_id: dto.trace_id, + span_id: dto.span_id, + level: dto.level, + message: dto.message, + http_method: dto.http_method, + http_path: dto.http_path, + http_status_code: dto.http_status_code, + labels: this.normalizeLabels(dto.labels), + ingestedAt: new Date().toISOString(), + }; + + // 로그 문서를 bulk 버퍼에 적재하고 flush가 끝날 때까지 기다린다. + await this.bulkIndexer.enqueue(LogIngestService.STREAM_KEY, document); + } +} +``` + +핵심 포인트: `ingest()`는 **BulkIndexerService.enqueue()가 resolve될 때까지 기다리는 async 함수**이다. + +--- + +### 2.3 BulkIndexerService (버퍼 → ES `_bulk`) + +```ts +interface BufferedItem { + index: string; + document: BaseApmDocument; + size: number; + resolve: () => void; + reject: (error: Error) => void; +} + +enqueue(streamKey: LogStreamKey, document: BaseApmDocument): Promise { + const indexName = this.storage.getDataStream(streamKey); + const size = + Buffer.byteLength(JSON.stringify({ index: { _index: indexName } })) + + Buffer.byteLength(JSON.stringify(document)) + + 2; + + return new Promise((resolve, reject) => { + this.buffer.push({ index: indexName, document, size, resolve, reject }); + this.bufferedBytes += size; + if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else { + this.ensureFlushTimer(); + } + }); +} +``` + +`enqueue()`는 **해당 문서가 포함된 bulk flush가 끝났을 때 resolve되는 Promise**를 반환한다. + +```ts +private async executeFlush(batch: BufferedItem[]): Promise { + const operations = this.buildOperations(batch); + try { + const response = await this.client.bulk({ operations }); + if (response.errors) { + this.logBulkError(response); + const error = new Error("Bulk 색인 중 일부 문서가 실패했습니다."); + batch.forEach((item) => item.reject(error)); + return; + } + batch.forEach((item) => item.resolve()); + this.logger.debug( + `Bulk 색인 완료 batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } catch (error) { + const wrapped = + error instanceof Error + ? error + : new Error(`Bulk 색인 실패: ${String(error)}`); + batch.forEach((item) => item.reject(wrapped)); + this.logger.warn( + "Bulk 색인 요청이 실패했습니다. Kafka 컨슈머가 재시도합니다.", + wrapped.stack, + ); + } +} +``` + +핵심 포인트: **flush 결과에 따라 batch 내 모든 Promise를 resolve/reject** 한다. + +--- + +## 3. 근본 원인: Kafka 파티션 단위 직렬 처리 + bulk Promise 연동 + +### 3.1 Kafka eachMessage / NestJS EventPattern의 기본 동작 + +- NestJS Kafka 마이크로서비스는 내부적으로 `kafkajs`의 `eachMessage` 패턴을 사용한다. +- `eachMessage`는 **같은 파티션에 대해서는 한 번에 하나의 메시지 handler만 실행**한다. +- 즉, `handleLogEvent()`가 반환되기 전까지 **같은 파티션에서 다음 메시지를 넘겨주지 않는다.** + +### 3.2 현재 구조에서 실제로 일어나는 일 + +1. 파티션 P0에 메시지 M1이 들어온다. +2. NestJS가 `handleLogEvent(M1)`을 호출한다. +3. `handleLogEvent()` 내부에서: + - `parsePayload()`로 DTO로 변환. + - `await this.logIngestService.ingest(dto);` 호출. +4. `logIngestService.ingest()`에서: + - `await this.bulkIndexer.enqueue(...);` 호출. +5. `BulkIndexer.enqueue()`는: + - `buffer.push(M1)` 한 뒤, + - **해당 문서를 포함하는 bulk flush가 끝날 때까지 resolve되지 않는 Promise를 반환**. +6. `enqueue()`의 Promise가 resolve될 때까지: + - `ingest()`는 반환하지 않음. + - `handleLogEvent()`도 반환하지 않음. + - 따라서 **kafkajs는 P0 파티션의 다음 메시지(M2)를 handler에 넘기지 않는다.** + +### 3.3 버퍼 / 타이머와의 상호작용 + +- 기본 설정 (예시): + - `BULK_BATCH_SIZE = 1000` + - `BULK_FLUSH_INTERVAL_MS = 1000` (1초) +- 하지만 **동시에 여러 메시지를 소비하지 못하기 때문에** 버퍼에는 항상 “현재 처리 중인 메시지 1개 정도만” 들어온다. +- `shouldFlushBySize()` 조건은 거의 만족되지 않고, + - 타이머(`flushIntervalMs`)가 1초마다 bulk flush를 수행한다. +- 결과적으로: + - **파티션당 1초에 1개씩 bulk flush가 수행**된다. + - 각 flush에는 보통 1개 문서만 포함된다. + - => **파티션당 대략 초당 1건, 분당 60건 수준의 처리량**으로 떨어진다. +- 반대로, 단건 index일 때는: + - 각 메시지에서 `client.index()` 하나만 await → 대략 수 ms~수십 ms 내에 반환. + - 따라서 파티션당 초당 수십~수백 건을 처리할 수 있었다. + +즉, **“bulk flush가 끝날 때까지 Kafka 컨슈머가 block되는 구조”** 때문에, +bulk가 이득을 못 보고 **극단적인 under-utilization**을 만드는 것이 문제의 핵심이다. + +--- + +## 4. 리팩토링 목표 + +1. **Kafka 컨슈머는 bulk flush 완료까지 block되지 않고** 가능한 한 빨리 메시지를 소비한다. + - 메시지 소비 속도 = Kafka → 메모리 버퍼 적재 속도. +2. **BulkIndexer는 별도의 비동기 작업으로 버퍼를 묶어서 ES `_bulk` 호출**을 수행한다. +3. MVP 단계에서는: + - **엄격한 “flush 성공 후에만 offset commit”** 대신, + - **최소한의 코드 변경으로 throughput을 회복하는 전략**을 사용한다. + - 즉, flush 실패 시 Kafka에서 재처리하기보다는 **로그를 남기고 버리는(best-effort)** 쪽에 가깝다. + - 추후 필요하면 `eachBatch + manual commit` 패턴으로 고도화할 수 있다. + +--- + +## 5. 설계 변경안 (MVP 버전) + +### 5.1 BulkIndexerService.enqueue를 *non-blocking*으로 변경 + +#### 5.1.1 BufferedItem 타입 단순화 + +**변경 전** + +```ts +interface BufferedItem { + index: string; + document: BaseApmDocument; + size: number; + resolve: () => void; + reject: (error: Error) => void; +} +``` + +**변경 후** + +```ts +interface BufferedItem { + index: string; + document: BaseApmDocument; + size: number; +} +``` + +- 더 이상 각 문서마다 개별 Promise를 resolve/reject하지 않는다. +- Bulk flush는 **“fire-and-forget 배치 작업”**으로 취급한다. + +#### 5.1.2 enqueue() 시그니처 및 동작 변경 + +**변경 전** + +```ts +enqueue(streamKey: LogStreamKey, document: BaseApmDocument): Promise { + const indexName = this.storage.getDataStream(streamKey); + const size = + Buffer.byteLength(JSON.stringify({ index: { _index: indexName } })) + + Buffer.byteLength(JSON.stringify(document)) + + 2; + + return new Promise((resolve, reject) => { + this.buffer.push({ index: indexName, document, size, resolve, reject }); + this.bufferedBytes += size; + if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else { + this.ensureFlushTimer(); + } + }); +} +``` + +**변경 후 (핵심 아이디어)** + +```ts +enqueue(streamKey: LogStreamKey, document: BaseApmDocument): void { + const indexName = this.storage.getDataStream(streamKey); + const size = + Buffer.byteLength(JSON.stringify({ index: { _index: indexName } })) + + Buffer.byteLength(JSON.stringify(document)) + + 2; + + // 즉시 버퍼에 쌓고 반환한다. (Promise 없음) + this.buffer.push({ index: indexName, document, size }); + this.bufferedBytes += size; + + if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else { + this.ensureFlushTimer(); + } +} +``` + +- `enqueue()`는 더 이상 `Promise`를 반환하지 않는다. +- 호출자는 **flush 완료 여부를 기다리지 않고 바로 다음 로직으로 진행**한다. +- Kafka 컨슈머 입장에서는 **메시지를 메모리 버퍼에 적재하는 순간 이미 “처리 완료”로 간주**하게 된다. + +#### 5.1.3 executeFlush()에서 Promise 관련 코드 제거 + +**변경 전** + +```ts +private async executeFlush(batch: BufferedItem[]): Promise { + const operations = this.buildOperations(batch); + try { + const response = await this.client.bulk({ operations }); + if (response.errors) { + this.logBulkError(response); + const error = new Error("Bulk 색인 중 일부 문서가 실패했습니다."); + batch.forEach((item) => item.reject(error)); + return; + } + batch.forEach((item) => item.resolve()); + this.logger.debug( + `Bulk 색인 완료 batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } catch (error) { + const wrapped = + error instanceof Error + ? error + : new Error(`Bulk 색인 실패: ${String(error)}`); + batch.forEach((item) => item.reject(wrapped)); + this.logger.warn( + "Bulk 색인 요청이 실패했습니다. Kafka 컨슈머가 재시도합니다.", + wrapped.stack, + ); + } +} +``` + +**변경 후 (MVP용 단순 버전)** + +```ts +private async executeFlush(batch: BufferedItem[]): Promise { + const operations = this.buildOperations(batch); + try { + const response = await this.client.bulk({ operations }); + + if (response.errors) { + // 일부 문서 실패 → 상세 에러 로그만 남기고, Kafka 재처리는 하지 않는다. + this.logBulkError(response); + this.logger.warn( + `Bulk 색인 중 일부 문서가 실패했습니다. batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } else { + this.logger.debug( + `Bulk 색인 완료 batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } + } catch (error) { + const wrapped = + error instanceof Error + ? error + : new Error(`Bulk 색인 실패: ${String(error)}`); + + this.logger.warn( + "Bulk 색인 요청이 실패했습니다. Kafka 컨슈머는 메시지를 계속 처리합니다.", + wrapped.stack, + ); + } +} +``` + +- flush 실패 시 **Kafka 쪽으로 예외를 전달하지 않는다.** +- 대신 **실패 로그만 남기고 다음 batch로 넘어가는 구조**이다. +- 이로 인해 “ES 인덱싱 실패 시 동일 메시지를 Kafka에서 재처리하는” 기능은 사라지지만, + **현재 문제(throughput 급락)를 해결하는 것이 우선**인 MVP 단계에서는 합리적인 트레이드오프다. + +> 이후 고도화 단계에서, `eachBatch + manual commit` 구조를 도입해 +> flush 성공 여부에 따라 offset commit을 조절하는 전략으로 개선할 수 있다. + +--- + +### 5.2 LogIngestService.ingest를 sync 스타일로 변경 + +`BulkIndexerService.enqueue()`가 이제 `void`를 반환하므로, +`LogIngestService.ingest()`도 더 이상 async/await가 필요 없다. + +**변경 전** + +```ts +@Injectable() +export class LogIngestService { + private static readonly STREAM_KEY: LogStreamKey = "apmLogs"; + + constructor(private readonly bulkIndexer: BulkIndexerService) {} + + async ingest(dto: LogEventDto): Promise { + const document: LogDocument = { + "@timestamp": this.resolveTimestamp(dto.timestamp), + type: "log", + service_name: dto.service_name, + environment: dto.environment, + trace_id: dto.trace_id, + span_id: dto.span_id, + level: dto.level, + message: dto.message, + http_method: dto.http_method, + http_path: dto.http_path, + http_status_code: dto.http_status_code, + labels: this.normalizeLabels(dto.labels), + ingestedAt: new Date().toISOString(), + }; + + await this.bulkIndexer.enqueue(LogIngestService.STREAM_KEY, document); + } +} +``` + +**변경 후** + +```ts +@Injectable() +export class LogIngestService { + private static readonly STREAM_KEY: LogStreamKey = "apmLogs"; + + constructor(private readonly bulkIndexer: BulkIndexerService) {} + + ingest(dto: LogEventDto): void { + const document: LogDocument = { + "@timestamp": this.resolveTimestamp(dto.timestamp), + type: "log", + service_name: dto.service_name, + environment: dto.environment, + trace_id: dto.trace_id, + span_id: dto.span_id, + level: dto.level, + message: dto.message, + http_method: dto.http_method, + http_path: dto.http_path, + http_status_code: dto.http_status_code, + labels: this.normalizeLabels(dto.labels), + ingestedAt: new Date().toISOString(), + }; + + // ES bulk 버퍼에만 적재하고, flush 완료는 기다리지 않는다. + this.bulkIndexer.enqueue(LogIngestService.STREAM_KEY, document); + } +} +``` + +- 반환 타입을 `Promise` → `void`로 변경. +- 내부에서 `await` 제거. + +--- + +### 5.3 LogConsumerController.handleLogEvent에서 ingest await 제거 + +**변경 전** + +```ts +@EventPattern(process.env.KAFKA_APM_LOG_TOPIC ?? "apm.logs") +async handleLogEvent(@Ctx() context: KafkaContext): Promise { + const value = context.getMessage().value; + if (value == null) { + this.logger.warn("Kafka 메시지에 본문이 없어 처리를 건너뜁니다."); + return; + } + + try { + const dto = this.parsePayload(value); + await this.logIngestService.ingest(dto); + await this.errorLogForwarder.forward(dto); + this.throughputTracker.markProcessed(); + this.logger.debug( + `로그가 색인되었습니다. topic=${context.getTopic()} partition=${context.getPartition()}`, + ); + } catch (error) { + ... + throw error; + } +} +``` + +**변경 후** + +```ts +@EventPattern(process.env.KAFKA_APM_LOG_TOPIC ?? "apm.logs") +async handleLogEvent(@Ctx() context: KafkaContext): Promise { + const value = context.getMessage().value; + if (value == null) { + this.logger.warn("Kafka 메시지에 본문이 없어 처리를 건너뜁니다."); + return; + } + + try { + const dto = this.parsePayload(value); + + // 1) ES bulk 버퍼에 비동기로만 적재하고, Kafka 소비는 block하지 않는다. + this.logIngestService.ingest(dto); + + // 2) 에러 로그 포워딩은 기존과 같이 await (필요시 나중에 비동기로 바꿀 수 있음) + await this.errorLogForwarder.forward(dto); + + this.throughputTracker.markProcessed(); + this.logger.debug( + `로그 처리 완료: topic=${context.getTopic()} partition=${context.getPartition()}`, + ); + } catch (error) { + if (error instanceof InvalidLogEventError) { + this.logger.warn( + `유효하지 않은 로그 이벤트를 건너뜁니다: ${error.message}`, + ); + return; + } + this.logger.error( + "로그 이벤트 처리에 실패했습니다.", + error instanceof Error ? error.stack : String(error), + ); + throw error; + } +} +``` + +- `logIngestService.ingest(dto)`에서 `await` 제거. +- ES bulk flush는 **백그라운드에서 진행**되고, + Kafka 컨슈머는 빠르게 다음 메시지를 처리한다. +- 에러 로그 포워딩이 상대적으로 가벼운 작업이라면 굳이 비동기화하지 않아도 + **전체 throughput의 병목은 BulkIndexer 쪽에서 해소**된다. + +> 필요시, `errorLogForwarder.forward(dto)`도 fire-and-forget으로 바꿀 수 있지만, +> 현재 병목의 주범은 bulk flush이므로 우선 priority는 낮다. + +--- + +### 5.4 다른 모듈에서 BulkIndexerService 사용 시 점검 + +`BulkIndexerService`는 로그 외에 **스팬 등 다른 도메인에서도 재사용**될 수 있으므로, +코드베이스 전체에서 다음 패턴을 검색해 한 번에 수정해야 한다. + +- 검색 키워드 예시: + - `await this.bulkIndexer.enqueue(` + - `await bulkIndexer.enqueue(` +- 각 사용처를 다음과 같이 변경: + - `await bulkIndexer.enqueue(...);` → `bulkIndexer.enqueue(...);` + - 해당 서비스 메서드의 반환 타입도 필요 시 `Promise` → `void`로 변경. + +--- + +## 6. 이 리팩토링으로 기대되는 효과 + +1. **Kafka 소비 속도 회복** + - 컨슈머는 더 이상 bulk flush 완료를 기다리지 않으므로, + **파티션당 초당 수천 건 수준까지** 다시 메시지를 소비할 수 있다 + (실제 값은 ES/네트워크 성능에 따라 달라짐). + - 현재 관찰된 **분당 60건 수준의 throughput 문제는 사라지고**, 최소한 + bulk 도입 이전(분당 ~7,000건) 수준 이상으로 회복될 가능성이 매우 높다. + +2. **bulk의 이점 유지** + - ES 쪽에서는 여전히 **NDJSON `_bulk` API**를 사용하므로, + 단건 index 대비 **round-trip 횟수 감소 및 CPU/네트워크 효율 증가** 효과를 유지한다. + - `maxBatchSize`, `maxBatchBytes`, `flushIntervalMs` 설정을 조절하여 + 최적의 batch 크기를 찾을 수 있다. + +3. **구현 복잡도 최소** + - Kafka offset과 ES flush 간의 엄격한 트랜잭션 일관성은 포기하지만, + 그 대신 **코드 변경량은 작고, 논리도 단순**하다. + - MVP 단계에서 현실적인 타협점이다. + +--- + +## 7. 후속 고도화 아이디어 (필수 아님, 참고용) + +> 이 섹션은 Codex가 당장 구현할 필요는 없고, 차후 개선 시 참고용이다. + +1. **각 파티션별 in-flight 문서 수 제한** + - 메모리 사용량을 제한하기 위해 `MAX_BUFFERED_ITEMS` 같은 옵션을 두고, + 일정 개수 이상 쌓이면 `enqueue()`에서 잠시 block하는 전략 적용 가능. + +2. **eachBatch + manual commit 패턴** + - kafkajs의 `eachBatch`를 사용해: + - 특정 배치(예: N개의 메시지)를 메모리 버퍼에 넣은 뒤, + - 해당 배치에 대한 bulk flush가 성공하면 offset commit, + - 실패하면 배치 전체 재처리 등의 로직 구현 가능. + - 이 패턴은 **throughput + at-least-once 보장**을 모두 고려하는 고급 설계다. + +3. **Dead Letter Queue(DLQ) 도입** + - ES flush가 반복적으로 실패하는 문서는 Kafka DLQ 토픽으로 보내고, + 별도의 복구/분석 파이프라인에서 처리할 수 있다. + +--- + +## 8. Codex를 위한 구현 체크리스트 + +Codex가 이 문서를 바탕으로 코드를 수정할 때 따라야 할 **구체적인 단계**는 다음과 같다. + +1. **`bulk-indexer.service.ts` 수정** + - `BufferedItem` 인터페이스에서 `resolve`, `reject` 제거. + - `enqueue()`의 반환 타입을 `Promise` → `void`로 변경하고, + 내부에서 `new Promise` 생성 로직 제거. + - `executeFlush()`에서 `batch.forEach(item => item.resolve/reject)` 호출 제거. + - 로그 메시지를 위 예시처럼 정리. + +2. **`log-ingest.service.ts` 수정** + - `ingest()`의 시그니처를 `async ingest(...): Promise` → `ingest(...): void`로 변경. + - 내부의 `await this.bulkIndexer.enqueue(...)`를 `this.bulkIndexer.enqueue(...)`로 변경. + +3. **`log-consumer.controller.ts` 수정** + - `handleLogEvent()` 내부에서 `await this.logIngestService.ingest(dto);` 를 + `this.logIngestService.ingest(dto);`로 변경. + - 나머지 로직은 그대로 두되, 로그 메시지는 상황에 맞게 약간 수정 가능. + +4. **BulkIndexerService 다른 사용처 점검** + - 전체 코드베이스에서 `await bulkIndexer.enqueue` 패턴 검색. + - 동일한 방식으로 `await` 제거 및 함수 시그니처 조정. + +5. **빌드 및 테스트** + - TypeScript 컴파일 오류가 없는지 확인. + - 로컬 환경에서 Kafka → ES 파이프라인을 실행하고, + - 컨슈머 lag가 빠르게 감소하는지, + - ES에 초당 수천 건 수준으로 색인되는지 확인. + - ES `_cat/indices` 또는 Kibana에서 로그 도큐먼트 수를 확인해 throughput을 체감. + +이 체크리스트를 모두 수행하면, **현재 bulk 인덱싱으로 인해 발생한 극단적인 성능 저하 문제는 해결**되고, +MVP에 적합한 수준의 고성능 로그 파이프라인을 확보할 수 있다. diff --git a/backend/report.md b/backend/report.md new file mode 100644 index 0000000..5ad29eb --- /dev/null +++ b/backend/report.md @@ -0,0 +1,198 @@ +# APM 로그 Kafka 소비에서 bulk 인덱싱 성능 문제 분석 및 개선 방안 + +## 문제 상황 요약 + +- 배포 환경에서 APM 로그를 Kafka → Elasticsearch로 이전하는 도중 **bulk index** 모드로 리팩터링하였다. 기존의 단건 `index()` 방식에서는 1분에 약 7 000개 문서가 Elasticsearch에 색인되었지만, bulk index로 전환한 뒤에는 Kafka 토픽에 **100만건**의 로그가 쌓여 있음에도 불구하고 Elasticsearch 색인이 1분에 **약 60건** 수준으로 급격히 감소하였다. +- Kafka consumer의 lag는 모두 소비한 것으로 나타나지만, Elasticsearch에는 거의 데이터가 들어오지 않는다. 이는 Kafka에서 메시지를 빠르게 가져왔지만, 어딘가에서 블로킹이 발생해 메시지 처리가 지연되고 있다는 의미다. + +## 원인 분석 + +### 1. `handleLogEvent`가 bulk flush 완료까지 블록(block)한다 + +현재의 `LogConsumerController.handleLogEvent()`는 Kafka 메시지를 DTO로 변환한 뒤 `logIngestService.ingest(dto)`를 **`await`** 하고, 그 다음 `errorLogForwarder.forward(dto)`를 `await`한다. `LogIngestService.ingest()`는 내부적으로 `bulkIndexer.enqueue()`를 호출한다. + +`BulkIndexerService.enqueue()`는 문서를 메모리 버퍼에 추가하고, 버퍼가 일정 크기(`BULK_BATCH_SIZE`, 기본값 1000개)나 크기(`BULK_BATCH_BYTES_MB`)를 넘어서거나 플러시 간격(`BULK_FLUSH_INTERVAL_MS`, 기본 1 s)이 만료될 때만 `_bulk` API를 호출한다. 중요 포인트는 `enqueue()`가 **`Promise`를 반환하며, flush가 완료된 뒤에야 `resolve()`** 된다는 점이다. `ingest()`는 이 Promise를 그대로 `await` 하므로 **해당 문서가 포함된 bulk flush가 끝날 때까지 consumer가 멈춘다**. + +KafkaJS의 `eachMessage` API는 기본적으로 같은 파티션에서는 다음 메시지를 호출하지 않고 **이전 메시지 처리가 끝날 때까지 순차적으로(block) 처리한다**. KafkaJS 공식 문서는 `eachMessage` 핸들러가 세션 타임아웃보다 오래 블록되어서는 안 된다고 명시한다【880035443235562†L100-L107】. 또한 기본적으로 `eachMessage`는 각 파티션에 대해 순차 호출되며, 여러 메시지를 동시에 처리하려면 `partitionsConsumedConcurrently` 옵션을 설정해야 한다【880035443235562†L205-L223】. + +따라서 현재 구조는 다음과 같이 동작한다: + +1. KafkaJS가 특정 파티션에서 메시지를 하나 전달한다. `handleLogEvent()`는 `bulkIndexer.enqueue()`가 포함된 `ingest()`를 `await` 한다. +2. `enqueue()`는 버퍼가 가득 찼거나 타이머가 만료될 때까지(최대 1 s) 반환되지 않으므로, `handleLogEvent()`는 최대 1초 동안 멈춘다. +3. KafkaJS는 같은 파티션에 대해 다음 메시지를 전달하지 않기 때문에, **버퍼가 1 000개가 차기 전까지(또는 1초가 지난 후)** consumer는 새로운 메시지를 가져오지 못한다. +4. 결국 1 000개를 모으기 전에 flush 타이머가 만료되고, 1 초마다 1건만 flush되는 현상이 발생한다. 이 구조는 생산자(back‑pressure)를 고려해 설계된 것이나, 파티션당 직렬 처리와 겹쳐지면서 throughput을 수십건/분 수준으로 떨어뜨렸다. + +### 2. 동시성 부족과 작업 단위가 큼 + +- `eachMessage`는 같은 파티션에 대해 순차적으로 실행되므로, 네트워크 I/O나 _bulk_ API 같은 **비동기 작업이 긴 경우** 성능이 크게 떨어진다. KafkaJS 문서에 따르면 여러 파티션에서 메시지를 동시에 처리하려면 `partitionsConsumedConcurrently` 값을 늘려야 한다【880035443235562†L205-L223】. +- Elasticsearch 공식 가이드에서는 개별 문서 색인보다 bulk API를 사용하는 것이 성능에 유리하지만, **단일 스레드로 bulk 요청을 전송하면 Elasticsearch 클러스터의 자원을 충분히 활용하지 못한다**고 설명한다. 여러 스레드/프로세스로 동시에 bulk 요청을 보내는 것이 이상적이며, 요청 크기는 실험을 통해 찾되 너무 큰 요청은 오히려 메모리 압박을 줄 수 있다고 경고한다【676536220397772†L905-L931】. +- WWT의 KafkaJS 경험 보고서는 `eachMessage`를 이용하여 외부 API 호출을 하던 코드를 `eachBatch` + 배치 엔드포인트로 전환한 결과 **8–10배 성능 향상**을 얻었다고 언급한다【901272165489513†L281-L285】. 이는 단건 처리보다 배치 처리, 그리고 배치별 커밋이 성능에 큰 영향을 준다는 실증적인 사례다. + +## 설계상의 문제점 정리 + +| 문제 | 증상/근거 | +|---|---| +| **await로 인해 Kafka consumer가 bulk flush까지 블로킹** | `enqueue()`의 Promise가 flush 완료 후에야 resolve돼 `handleLogEvent()`가 멈추고, KafkaJS는 같은 파티션의 다음 메시지를 호출하지 않는다. `eachMessage`는 블로킹 연산을 피해야 한다는 문서【880035443235562†L100-L107】. | +| **파티션당 직렬 처리** | 기본 설정에서는 `eachMessage`가 순차적으로 실행되어 네트워크 지연이 throughput을 제한한다. KafkaJS는 `partitionsConsumedConcurrently` 옵션으로 여러 메시지를 동시에 처리할 수 있음을 명시한다【880035443235562†L205-L223】. | +| **Bulk 요청 크기/타이머 설정이 부적합** | `maxBatchSize=1000`, `flushIntervalMs=1000`은 파티션당 초당 하나의 flush만 발생해 메시지가 버퍼에 오래 머무른다. Elastic 문서는 bulk 요청 크기와 워커 수를 실험적으로 맞추고 너무 크면 성능이 떨어질 수 있다고 설명한다【676536220397772†L905-L931】. | +| **단일 스레드에서만 bulk 호출** | Elastic 문서는 여러 스레드/프로세스로 bulk 요청을 보내야 클러스터 자원을 최대한 활용할 수 있다고 권장한다【676536220397772†L924-L946】. 현재 `BulkIndexerService`는 `maxParallelFlushes` 옵션을 제공하지만 기본값 2로 제한돼 있다. | + +## 개선 방안 + +### 1. 비동기 처리 개선 – consumer를 flush 완료까지 기다리지 않기 + +메시지 소비는 빠르게 진행되고, bulk flush는 백그라운드에서 비동기로 처리되도록 설계하는 것이 중요하다. 따라서 `bulkIndexer.enqueue()`를 호출할 때 반환되는 `Promise`를 **기다리지 않고** Fire‑and‑forget 방식으로 큐에 추가해야 한다. 실패 시에는 로깅하고 메트릭을 올리거나 Kafka 오프셋 커밋을 별도로 관리해야 한다. + +구체적으로: + +- `LogIngestService.ingest()`에서 `await this.bulkIndexer.enqueue(...)`를 제거하고, 반환되는 Promise를 처리하지 않은 채로 버퍼에 넣는다. 에러는 `.catch()`에서 로깅한다. 이렇게 하면 Kafka consumer는 flush 타이밍과 무관하게 다음 메시지를 계속 받을 수 있다. +- `LogConsumerController.handleLogEvent()`에서도 `await`를 제거해 consumer 핸들러가 블로킹되지 않도록 한다. 에러 처리를 위해 반환된 Promise를 별도로 캐치하고 로깅하거나, 실패한 경우 해당 오프셋을 별도 큐에 저장해 재처리할 수 있다. + +이 방법의 단점은 **오프셋 커밋 타이밍이 빨라질 수 있어 일부 문서가 Elasticsearch에 완전히 색인되기 전에 메시지를 성공으로 간주할 수 있다는 것**이다. 만약 정확한 처리 보장이 필요하다면, 아래의 `eachBatch` 패턴이나 manual commit을 활용해 flush 이후에만 커밋하도록 관리하는 방식을 채택한다. + +### 2. 각 파티션 병렬 처리 – `partitionsConsumedConcurrently` 사용 + +KafkaJS는 기본적으로 파티션당 하나의 `eachMessage` 호출만 실행한다. 비동기 작업이 길어지면 전체 처리량이 급격히 떨어지므로, 여러 파티션을 동시에 처리하도록 `partitionsConsumedConcurrently`를 늘려야 한다. KafkaJS 문서에 따르면 `partitionsConsumedConcurrently` 값을 올리면 여러 파티션의 메시지를 동시에 처리하면서도 **같은 파티션 내 순서를 보장**한다【880035443235562†L205-L223】. NestJS에서는 microservice 옵션에 `consumer: { partitionsConsumedConcurrently: N }`를 지정하여 이를 설정할 수 있다. + +예를 들어, 토픽이 4개 파티션으로 구성돼 있고 CPU 코어가 충분하다면 `partitionsConsumedConcurrently: 2` 또는 `3`을 설정하여 파티션 간 병렬성을 올릴 수 있다. 이 값은 파티션 수보다 크게 설정해도 효과가 없으므로, 토픽의 파티션 수와 시스템 성능을 고려해 실험적으로 결정한다. + +### 3. `eachBatch` 및 manual commit 패턴으로 전환 + +bulk 인덱싱은 본질적으로 여러 레코드를 하나의 요청으로 묶는 작업이므로, Kafka consumer에서도 **배치 단위로 메시지를 가져와 처리**하는 것이 더 적합하다. `eachBatch` 핸들러는 메시지 배열과 함께 `resolveOffset`/`commitOffsetsIfNecessary`/`heartbeat` 등 유용한 API를 제공한다【880035443235562†L109-L166】. 이를 이용하면 다음과 같은 장점이 있다: + +- 한 배치의 메시지를 모두 `bulkIndexer.enqueue()`로 넣은 뒤 바로 flush를 요청할 수 있다. +- flush가 완료된 후 `commitOffsetsIfNecessary()`를 호출하여 해당 배치의 마지막 오프셋만 커밋함으로써 **at‑least‑once** 처리를 보장한다. +- `eachBatch`는 기본적으로 KafkaJS 내부에서 batch 당 자동 커밋을 수행하며, `eachBatchAutoResolve`를 제어하여 원하는 커밋 시점을 조정할 수 있다. + +WWT의 사례에서도 `eachMessage` 대신 `eachBatch` 패턴을 사용하고 외부 API도 배치로 묶자 **8–10배 이상 성능 향상**을 경험했다고 보고한다【901272165489513†L281-L285】. + +### 4. Bulk flush 파라미터 조정 및 병렬 flush 증가 + +- **배치 크기와 플러시 주기 조정** : 현재 `BULK_BATCH_SIZE=1000`, `BULK_FLUSH_INTERVAL_MS=1000`은 빈도가 너무 낮아 batch가 1000개가 채워지지 않는 상황에서 1초마다 하나의 flush만 발생한다. Elastic 문서에서는 최적의 bulk 크기를 실험적으로 찾고, 너무 큰 요청은 메모리 압박을 유발할 수 있으므로 수십 MB를 넘기지 않도록 권고한다【676536220397772†L905-L916】. 예를 들어 초기 배치 크기를 200–500개로 줄이고 flush 간격을 100–200 ms로 줄이면 메시지가 버퍼에 오래 머무르지 않고 빠르게 색인된다. +- **동시 flush 수 늘리기** : `BulkIndexerService`의 `maxParallelFlushes`는 동시 flush 요청 수를 제한한다. 현재 기본값은 2인데, Elasticsearch 클러스터 여유가 있다면 이 값을 늘려서 동시에 여러 bulk 요청을 보내도록 한다. Elastic 문서에서도 단일 스레드로는 클러스터의 색인 성능을 충분히 활용하지 못한다고 지적하며, 여러 워커/스레드를 통해 데이터를 전송해야 한다고 조언한다【676536220397772†L924-L931】. + +## 코드 수정안 (예시) + +아래 수정안은 기존 구조를 크게 바꾸지 않으면서 consumer 블로킹 문제를 해결하는 접근법이다. 정확한 구현은 서비스 요구 사항(정확한 처리 보장 vs. 처리량 우선)을 고려해 조정한다. + +### 1. `LogIngestService.ingest()` – bulk enqueue를 Fire‑and‑forget으로 변경 + +```ts +// src/apm/log-ingest/log-ingest.service.ts +async ingest(dto: LogEventDto): Promise { + const document: LogDocument = { + "@timestamp": this.resolveTimestamp(dto.timestamp), + type: "log", + service_name: dto.service_name, + environment: dto.environment, + trace_id: dto.trace_id, + span_id: dto.span_id, + level: dto.level, + message: dto.message, + http_method: dto.http_method, + http_path: dto.http_path, + http_status_code: dto.http_status_code, + labels: this.normalizeLabels(dto.labels), + ingestedAt: new Date().toISOString(), + }; + + // 버퍼에 적재한 뒤 기다리지 않는다. 실패 시에는 로깅만 하고, 필요하다면 재시도 로직을 별도로 구현한다. + void this.bulkIndexer + .enqueue(LogIngestService.STREAM_KEY, document) + .catch((err) => { + // handle error: 메트릭 증가, 모니터링, 재큐잉 등 + this.logger.error('Bulk enqueue failed', err); + }); +} +``` + +### 2. `LogConsumerController.handleLogEvent()` – 비동기 호출로 변경 + +```ts +// src/log-consumer/log-consumer.controller.ts +@EventPattern(process.env.KAFKA_APM_LOG_TOPIC ?? 'apm.logs') +async handleLogEvent(@Ctx() context: KafkaContext): Promise { + const value = context.getMessage().value; + if (value == null) { + this.logger.warn('Kafka 메시지에 본문이 없어 처리를 건너뜁니다.'); + return; + } + + try { + const dto = this.parsePayload(value); + // bulk 인덱싱은 기다리지 않고 큐에 추가 + this.logIngestService.ingest(dto); + // 에러 로그 포워딩은 빠르게 끝나므로 동시에 실행하도록 하되, 필요하다면 await + void this.errorLogForwarder.forward(dto).catch((err) => { + this.logger.error('Error forward failed', err); + }); + // 처리된 메시지 카운트는 즉시 증가시킨다 + this.throughputTracker.markProcessed(); + this.logger.debug( + `로그 이벤트 처리 시작: topic=${context.getTopic()} partition=${context.getPartition()}`, + ); + } catch (error) { + if (error instanceof InvalidLogEventError) { + this.logger.warn( + `유효하지 않은 로그 이벤트를 건너뜁니다: ${error.message}`, + ); + return; + } + this.logger.error( + '로그 이벤트 처리에 실패했습니다.', + error instanceof Error ? error.stack : String(error), + ); + // 오류를 throw하면 KafkaJS가 해당 오프셋을 커밋하지 않고 재처리한다. + throw error; + } +} +``` + +이렇게 수정하면 `handleLogEvent()`는 bulk flush 완료 여부와 무관하게 다음 메시지를 받을 수 있어 consumer가 블로킹되지 않는다. 단, flush 실패와 오프셋 커밋 사이의 정확한 처리 보증을 위해서는 manual commit을 적용하거나 실패한 문서를 별도 큐에 저장하는 전략이 필요하다. + +### 3. Kafka consumer 동시성 설정 + +KafkaJS에서 파티션을 병렬로 처리하려면 consumer 실행 시 `partitionsConsumedConcurrently`를 설정한다. NestJS에서는 microservice 옵션에 다음과 같이 추가한다. 이 설정은 토픽의 파티션 수와 시스템 리소스를 고려해 조정한다. + +```ts +// 예시: KafkaModule 등록 시 consumer 옵션에 partitionsConsumedConcurrently 설정 +KafkaModule.register({ + client: { + clientId: 'apm-log-consumer', + brokers: [process.env.KAFKA_BROKER_URL], + }, + consumer: { + groupId: process.env.KAFKA_GROUP_ID ?? 'apm-log-group', + // 파티션 3개를 동시에 소비. 값은 파티션 수보다 크지 않아야 함 + partitionsConsumedConcurrently: Number.parseInt(process.env.KAFKA_CONCURRENT_PARTITIONS ?? '2', 10), + }, +}); +``` + +`ConsumerRunConfig` 타입 정의에는 `partitionsConsumedConcurrently` 옵션이 존재하며, `eachBatch`/`eachMessage`와 함께 사용할 수 있다【217620888615763†L2674-L2678】. 이 값을 증가시키면 여러 파티션에서 메시지를 동시에 처리하면서도 **각 파티션 내 순서는 유지**된다【880035443235562†L205-L223】. + +### 4. `eachBatch` 패턴으로 전환 (선택적) + +보다 강력한 처리량과 정확한 오프셋 관리가 필요하다면 `eachBatch`를 활용한 구조를 고려한다. 간략한 흐름은 다음과 같다: + +1. Kafka consumer를 `autoCommit: false` 또는 `eachBatchAutoResolve: false`로 설정한다. +2. `eachBatch` 핸들러에서 `batch.messages` 배열을 순회하며 각 메시지를 DTO로 변환하고 `bulkIndexer.enqueue()`에 넣는다. 이 때 `enqueue()`는 기다리지 않고 바로 반환한다. +3. 배치의 모든 메시지가 큐에 들어가면 즉시 `bulkIndexer.triggerFlush()`(별도의 public API를 만들거나 `maxBatchSize`/`flushIntervalMs`를 낮춤)로 flush를 요청한다. +4. flush가 성공하면 `resolveOffset`으로 배치의 마지막 오프셋을 표시하고 `commitOffsetsIfNecessary()`를 호출해 해당 오프셋을 커밋한다. 실패하면 예외를 throw해 KafkaJS가 동일 배치를 재처리하게 한다. + +이 방식은 메시지 처리와 flush를 분리하면서도 flush가 성공해야만 오프셋을 커밋하므로 **at‑least‑once** 보장에 적합하다. 구현 난이도가 높지만 성능 향상과 신뢰성을 동시에 얻을 수 있다. + +### 5. Bulk flush 파라미터 조정 + +환경 변수나 설정 파일을 통해 다음 값을 조정한다: + +- `BULK_BATCH_SIZE` – 한 번에 묶을 문서 수. 초기에는 100–500 사이로 설정해 메시지가 버퍼에 머무는 시간을 줄인다. +- `BULK_BATCH_BYTES_MB` – 배치의 최대 바이트 수. 너무 크면 Elasticsearch가 과부하될 수 있으므로 수십 MB 이하를 권장한다【676536220397772†L905-L916】. +- `BULK_FLUSH_INTERVAL_MS` – 타이머 기반 flush 주기. 실험적으로 100–200 ms로 줄이면 작은 배치라도 지연 없이 flush된다. +- `BULK_MAX_PARALLEL_FLUSHES` – 동시 flush 횟수. 클러스터가 견딜 수 있는 범위에서 값을 늘리면 여러 배치를 동시에 색인할 수 있다【676536220397772†L924-L931】. + +## 결론 + +현재 문제는 bulk 인덱싱 자체가 아니라, **consumer가 flush 완료까지 블로킹**되는 설계와 **파티션당 직렬 처리**가 결합되면서 발생한 병목이다. KafkaJS 문서가 경고하듯 `eachMessage` 핸들러는 장시간 블로킹을 피해야 하며, 여러 파티션을 동시에 처리하도록 `partitionsConsumedConcurrently`를 설정해야 한다【880035443235562†L100-L107】【880035443235562†L205-L223】. 또한 Elasticsearch에서는 bulk 요청을 여러 스레드에서 보내야 최대 색인 속도를 달성할 수 있고, 배치 크기와 flush 주기를 실험적으로 튜닝해야 한다【676536220397772†L905-L931】. + +위에서 제시한 Fire‑and‑forget 방식, `partitionsConsumedConcurrently` 활용, `eachBatch` 패턴, flush 파라미터 조정 등을 적용하면 배포 환경에서도 **초당 수천 건** 수준의 처리량을 회복할 수 있을 것이다. 추가로, 실패한 문서의 재처리와 정확한 오프셋 관리를 위해 로그 저장소 상태와 Kafka 오프셋을 함께 관리하는 로직을 설계하는 것이 좋다. diff --git a/backend/rollup_metrics_spec.md b/backend/rollup_metrics_spec.md index 83ab624..f6233ff 100644 --- a/backend/rollup_metrics_spec.md +++ b/backend/rollup_metrics_spec.md @@ -6,7 +6,7 @@ 현재 APM 시스템은 다음과 같이 동작합니다: -- **데이터 수집 파이프라인**: 애플리케이션에서 생성되는 로그/스팬을 Kafka 토픽(`apm.logs`, `apm.spans`)에 발행하고, `stream‑processor` 서비스가 이를 소비하여 Elasticsearch 데이터 스트림(`logs-apm.*`, `traces-apm.*`)에 인덱싱합니다. +- **데이터 수집 파이프라인**: 애플리케이션에서 생성되는 로그/스팬을 Kafka 토픽(`apm.logs`, `apm.spans`)에 발행하고, `stream‑processor` 서비스가 이를 소비하여 Elasticsearch 데이터 스트림(`logs-apm.*`, `traces-apm.*`)에 인덱싱합니다. 이때 stream-processor는 `apm_bulk_design.md`에 정의된 BulkIndexer로 문서를 버퍼링한 뒤 `_bulk` API로 저장하며, 이 단계에서는 어떤 롤업/집계도 수행하지 않습니다. - **조회 API(Query‑API)**: NestJS 기반 서비스로, Elasticsearch에서 시간 범위에 맞는 로그/스팬을 조회하면서 집계(`percentiles`, `sum`, `filter` 등)를 수행하여 메트릭(p50, p90, p95, request_total, error_rate)을 실시간으로 계산하고 응답합니다. **문제점**: 장기 구간(예: 최근 1시간 이상)을 지속적으로 조회할 경우, 매번 원본 로그/스팬 전체를 스캔해야 하므로 Elasticsearch 및 `stream‑processor`의 부하가 증가합니다. 또한 SLA/SLO 보고서나 주간/월간 대시보드와 같이 장기 데이터를 자주 조회하는 경우 성능과 비용 문제가 발생합니다. @@ -16,6 +16,7 @@ 1. **롤업(Roll‑up) 인덱스 도입**: 로그/스팬 원본을 일정한 시간 단위(이 문서에서는 1분 버킷으로 정의)로 미리 집계하여 별도의 인덱스에 저장합니다. 이 인덱스는 p50/p90/p95, 요청 수, 오류 수 등을 포함한 요약 메트릭을 제공합니다. 2. **짧은 구간은 원본 집계 + 캐시 유지**: 최근 몇 분(예: 5분 이내)은 여전히 원본 인덱스에서 실시간 집계하여 최신 데이터를 제공합니다. 해당 부분은 Redis 캐시(10초 시간 버킷) 전략을 사용합니다. 3. **Query‑API 개선**: 요청 범위에 따라 raw 인덱스, 롤업 인덱스 또는 두 데이터를 결합하여 응답을 생성합니다. 이 과정은 완전히 자동화되어야 하며, 기존 엔드포인트와 스키마를 유지합니다. +4. **롤업/집계 파이프라인 분리**: ingest 단계(Bulk 색인)에서는 오로지 문서 저장에만 집중하고, 1분 버킷 카운트 같은 집계는 Elasticsearch Transform·Aggregation API·Cron job 등 별도 후처리 경로로 구현합니다. 컨슈머에서 집계까지 수행하면 per-message 처리 시간이 길어져 throughput이 크게 떨어지므로, 본 스펙의 모든 롤업/통계 계산은 **색인 후 비동기 파이프라인**에서 이루어져야 합니다. ## 2. 롤업 인덱스 설계 @@ -83,22 +84,19 @@ ### 3.1 Stream‑Processor 변경 -현재 `stream‑processor`는 Kafka `apm.logs`/`apm.spans` 토픽을 소비하여 raw 데이터를 ES에 인덱싱합니다. 여기에 **롤업 생산 기능**을 추가합니다. - -1. **슬라이딩 윈도우 집계** - - 프로듀서 측에서 로그/스팬을 소비할 때, `service_name`과 `environment`를 키로 하여 **1분 단위 슬라이딩 윈도우**를 관리합니다. - - RxJS 또는 Node.js의 시간 기반 버퍼링을 활용하여, 각 1분 간격이 끝날 때마다 해당 분에 속하는 이벤트들의 통계를 계산합니다. -2. **메트릭 계산** - - `duration_ms` 또는 해당 스팬/로그의 처리 시간 필드를 이용해 p50/p90/p95/p99을 계산합니다. JavaScript에서 t‑digest 알고리즘 라이브러리(예: `tdigest` npm 패키지)를 사용하거나, 간단히 배열 정렬 후 퍼센타일을 계산할 수 있습니다. - - `request_count`는 이벤트 수, `error_count`는 로그 수준(`ERROR` 이상) 또는 스팬의 `status != OK`인 건수로 정의합니다. - - `error_rate = error_count / request_count` 로 계산합니다. `request_count=0`이면 0으로 둡니다. -3. **인덱싱** - - 집계가 끝나면 위에서 정의한 필드 구조로 JSON 문서를 생성하여, 해당 서비스의 `metrics-apm.-` 데이터 스트림에 추가합니다. - - 인덱싱 시점의 타임스탬프는 버킷의 시작 시각(예: 분 경계)을 사용합니다. -4. **오류 처리** - - 롤업 인덱싱에 실패하더라도 raw 인덱싱에는 영향이 없어야 합니다. 에러 발생 시 별도의 로깅 및 재시도 로직을 넣되, 롤업 실패로 인해 전체 수집이 지연되지 않게 합니다. -5. **구성 가능성** - - 롤업 활성화 여부, 버킷 크기, 메트릭 대상(서비스/endpoint), 보존 기간 등을 환경 변수로 설정할 수 있게 만듭니다. +현재 `stream‑processor`는 Kafka `apm.logs`/`apm.spans`를 BulkIndexer를 통해 raw 데이터 스트림에 저장합니다. 롤업 생산은 ingest 경로가 아닌 **별도 후처리 파이프라인**으로 구현합니다. 접근 방식은 다음 중 하나를 택합니다. + +1. **Elasticsearch Transform**: + - `logs-apm.*`와 `traces-apm.*`를 원천 인덱스로 지정하고 1분 버킷 Transform을 생성합니다. + - Transform은 elasticsearch가 관리하므로 stream‑processor의 CPU/메모리를 소모하지 않고 롤업 인덱스(`metrics-apm.*`)를 채웁니다. + - 퍼센타일 계산은 ES `percentiles` 집계를 사용합니다. +2. **별도 Aggregator(Worker)**: + - Query-API 혹은 전용 워커 프로세스가 일정 주기(예: 30초)에 raw 인덱스를 집계하여 롤업 문서를 생성한 뒤 Bulk로 색인합니다. + - 이 워커는 stream‑processor와 별도의 애플리케이션으로 운영되어 ingest 경로와 자원을 공유하지 않습니다. +3. **Redis/메시지 기반 파이프라인**: + - 임시 저장소에 엔드포인트별 카운터를 적재 후, 일정 주기에 롤업 인덱스로 밀어 넣는 구조도 가능합니다. 핵심은 ingest 서비스와 집계 로직을 명확히 분리하는 것입니다. + +선택한 방식과 관계없이 롤업 파이프라인은 실패해도 raw 색인을 막지 않아야 하며, stream‑processor의 BulkIndexer 처리량에 영향을 주지 않도록 독립적으로 운영해야 합니다. ### 3.2 ES Transform 사용 (대체안) @@ -198,7 +196,7 @@ Transform 방식은 인프라 차원에서 설정하면 되지만, 이번 명세 ## 5. 배포 및 운영 지침 1. **롤업 인덱스 템플릿/ILM 적용**: 앞서 제시한 매핑과 보존 정책을 Elasticsearch 클러스터에 등록합니다. 수집 환경별(namespace별)로 템플릿 이름을 구분합니다. -2. **stream‑processor 업데이트 배포**: 롤업 집계 모듈을 통합한 후, Canary 배포 등으로 점진적으로 도입합니다. 배포 전에 Kafka lag과 ES 인덱싱 지연을 모니터링하여, 롤업 연산이 수집 성능에 미치는 영향을 확인합니다. +2. **stream‑processor 업데이트 배포**: 롤업 집계 모듈을 통합한 후, Canary 배포 등으로 점진적으로 도입합니다. 배포 전에 Kafka lag과 ES 인덱싱 지연을 모니터링하여, 롤업 연산이 **별도의 후처리 파이프라인에서 동작하고 ingest 경로를 방해하지 않는지** 확인합니다. 3. **모니터링**: 롤업 인덱스 생성률, 문서 수, 집계 지연 등을 대시보드로 구성하여 이상을 감지합니다. raw 인덱스와 롤업 인덱스의 데이터가 동일 기간에 대해 일치하는지도 확인합니다. 4. **백필(backfill)**: 롤업 도입 이전 기간에 대해 과거 데이터를 롤업 인덱스에 채워 넣어야 하는 경우, 별도의 배치 프로세스를 만들어 일정 구간씩 raw 데이터를 읽어 롤업 문서를 생성합니다. 백필 작업은 스로틀링(throttling)을 걸어서 클러스터 부하를 피하도록 합니다. diff --git a/backend/src/notification/notification.module.ts b/backend/src/notification/notification.module.ts deleted file mode 100644 index e2eccd3..0000000 --- a/backend/src/notification/notification.module.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Module } from "@nestjs/common"; -import { NotificationService } from "./notification.service"; - -@Module({ - providers: [NotificationService], - exports: [NotificationService], -}) -export class NotificationModule {} diff --git a/backend/src/notification/notification.service.ts b/backend/src/notification/notification.service.ts deleted file mode 100644 index 26758fb..0000000 --- a/backend/src/notification/notification.service.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { Injectable, Logger } from "@nestjs/common"; -import { Transporter } from "nodemailer"; -import * as nodemailer from "nodemailer"; - -/** - * 알람을 울릴 때 필요한건 - * 서비스명 - * 현재수치 - * 메트릭이름 - * 인증관련은 미들웨어 - */ -export interface SLOAlertData { - service: string; - metric: string; - currentValue: number; -} - -@Injectable() -export class NotificationService { - private readonly transporter: Transporter | null = null; - private readonly fromEmail: string; - private readonly defaultRecipient: string; - private readonly logger = new Logger(NotificationService.name); - - constructor() { - const gmailUser = process.env.GMAIL_USER; - const gmailAppPassword = process.env.GMAIL_APP_PASSWORD; - this.defaultRecipient = process.env.EMAIL_DEFAULT_RECIPIENT ?? ""; - this.fromEmail = gmailUser ?? "no-reply@panopticon.com"; - - if (!gmailUser || !gmailAppPassword) { - this.logger.warn( - "Gmail credentials not set (GMAIL_USER, GMAIL_APP_PASSWORD). Email notifications will be disabled.", - ); - return; - } - - this.transporter = nodemailer.createTransport({ - service: "gmail", - auth: { - user: gmailUser, - pass: gmailAppPassword, - }, - }); - - this.transporter.verify((error) => { - if (error) { - this.logger.error("Transporter verification failed:", error); - } else { - this.logger.log("✅ Transporter is ready to send emails"); - } - }); - - this.logger.log("Email provider initialized2"); - } - - /** - * SLO 위반 알림 전송 (Slack + Email) - */ - async sendSLOAlert(data: SLOAlertData): Promise { - if (!this.transporter) { - this.logger.warn("Email transporter not initialized. Email not sent."); - return false; - } - - try { - const recipients = this.defaultRecipient; - - if (recipients.length === 0) { - this.logger.warn("No email recipients specified. Email not sent."); - return false; - } - - await this.transporter.sendMail({ - from: `"Panopticon Alert" <${this.fromEmail}>`, - to: recipients, - subject: "Panopticon Alert", - text: `비~상~ ${data.service}의 ${data.metric}이 ${data.currentValue}..!`, - }); - - this.logger.log(`Email sent to ${recipients}`); - return true; - } catch (error) { - this.logger.error("Failed to send email", error); - return false; - } - } -} diff --git a/backend/src/shared/common/kafka/kafka.config.ts b/backend/src/shared/common/kafka/kafka.config.ts index 8fc6c33..956d5eb 100644 --- a/backend/src/shared/common/kafka/kafka.config.ts +++ b/backend/src/shared/common/kafka/kafka.config.ts @@ -143,6 +143,10 @@ export function createKafkaMicroserviceOptions( consumer: { groupId: params.groupId, allowAutoTopicCreation: params.allowAutoTopicCreation ?? true, + partitionsConsumedConcurrently: Number.parseInt( + process.env.KAFKA_CONCURRENT_PARTITIONS ?? "3", + 10, + ), }, }, }; diff --git a/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts b/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts index 8c49ab2..2f50ff1 100644 --- a/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts +++ b/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts @@ -1,9 +1,10 @@ import { Module } from "@nestjs/common"; import { ApmInfrastructureModule } from "../../../shared/apm/apm.module"; +import { BulkIngestModule } from "../../common/bulk-ingest.module"; import { LogIngestService } from "./log-ingest.service"; @Module({ - imports: [ApmInfrastructureModule], + imports: [ApmInfrastructureModule, BulkIngestModule], providers: [LogIngestService], exports: [LogIngestService], }) diff --git a/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts b/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts index e8ff95b..180ee07 100644 --- a/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts +++ b/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts @@ -1,16 +1,19 @@ import { Injectable } from "@nestjs/common"; -import { ApmLogRepository } from "../../../shared/apm/logs/log.repository"; import type { LogEventDto } from "../../../shared/apm/logs/dto/log-event.dto"; import type { LogDocument } from "../../../shared/apm/logs/log.document"; +import type { LogStreamKey } from "../../../shared/logs/log-storage.service"; +import { BulkIndexerService } from "../../common/bulk-indexer.service"; /** * Kafka에서 들어온 로그 이벤트를 Elasticsearch에 저장하는 서비스 */ @Injectable() export class LogIngestService { - constructor(private readonly repository: ApmLogRepository) {} + private static readonly STREAM_KEY: LogStreamKey = "apmLogs"; - async ingest(dto: LogEventDto): Promise { + constructor(private readonly bulkIndexer: BulkIndexerService) {} + + ingest(dto: LogEventDto): void { const document: LogDocument = { "@timestamp": this.resolveTimestamp(dto.timestamp), type: "log", @@ -27,7 +30,8 @@ export class LogIngestService { ingestedAt: new Date().toISOString(), }; - await this.repository.save(document); + // 로그 문서를 bulk 버퍼에 적재하고 즉시 반환해 Kafka 처리를 막지 않는다. + this.bulkIndexer.enqueue(LogIngestService.STREAM_KEY, document); } /** diff --git a/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts b/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts index fc51b50..a154c13 100644 --- a/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts +++ b/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts @@ -1,9 +1,10 @@ import { Module } from "@nestjs/common"; import { ApmInfrastructureModule } from "../../../shared/apm/apm.module"; +import { BulkIngestModule } from "../../common/bulk-ingest.module"; import { SpanIngestService } from "./span-ingest.service"; @Module({ - imports: [ApmInfrastructureModule], + imports: [ApmInfrastructureModule, BulkIngestModule], providers: [SpanIngestService], exports: [SpanIngestService], }) diff --git a/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts b/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts index 20443ec..dcc8fd7 100644 --- a/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts +++ b/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts @@ -1,16 +1,19 @@ import { Injectable } from "@nestjs/common"; -import { SpanRepository } from "../../../shared/apm/spans/span.repository"; import type { SpanEventDto } from "../../../shared/apm/spans/dto/span-event.dto"; import type { SpanDocument } from "../../../shared/apm/spans/span.document"; +import type { LogStreamKey } from "../../../shared/logs/log-storage.service"; +import { BulkIndexerService } from "../../common/bulk-indexer.service"; /** * 스팬 이벤트를 Elasticsearch에 저장하는 서비스 */ @Injectable() export class SpanIngestService { - constructor(private readonly repository: SpanRepository) {} + private static readonly STREAM_KEY: LogStreamKey = "apmSpans"; - async ingest(dto: SpanEventDto): Promise { + constructor(private readonly bulkIndexer: BulkIndexerService) {} + + ingest(dto: SpanEventDto): void { const document: SpanDocument = { "@timestamp": this.resolveTimestamp(dto.timestamp), type: "span", @@ -30,7 +33,8 @@ export class SpanIngestService { ingestedAt: new Date().toISOString(), }; - await this.repository.save(document); + // 스팬 문서를 BulkIndexer 버퍼에 적재해 Kafka 처리가 지연되지 않도록 한다. + this.bulkIndexer.enqueue(SpanIngestService.STREAM_KEY, document); } /** diff --git a/backend/src/stream-processor/common/bulk-indexer.service.ts b/backend/src/stream-processor/common/bulk-indexer.service.ts new file mode 100644 index 0000000..5e4c7ea --- /dev/null +++ b/backend/src/stream-processor/common/bulk-indexer.service.ts @@ -0,0 +1,224 @@ +import { Injectable, Logger, OnModuleDestroy } from "@nestjs/common"; +import type { BaseApmDocument } from "../../shared/apm/common/base-apm.repository"; +import { + LogStorageService, + type LogStreamKey, +} from "../../shared/logs/log-storage.service"; +import type { Client } from "@elastic/elasticsearch"; + +interface BufferedItem { + index: string; + document: BaseApmDocument; + size: number; +} + +/** + * Elasticsearch Bulk API를 이용해 로그/스팬을 배치 단위로 색인하는 유틸리티 + * - 버퍼에 문서를 모았다가 크기/시간 조건을 만족하면 NDJSON 형태로 전송 + * - 동시 플러시 수를 제한해 ES 클러스터 과부하를 막는다 + */ +@Injectable() +export class BulkIndexerService implements OnModuleDestroy { + private readonly logger = new Logger(BulkIndexerService.name); + private readonly client: Client; + private readonly maxBatchSize: number; + private readonly maxBatchBytes: number; + private readonly flushIntervalMs: number; + private readonly maxParallelFlushes: number; + + private buffer: BufferedItem[] = []; + private bufferedBytes = 0; + private flushTimer: NodeJS.Timeout | null = null; + private inFlightFlushes = 0; + private pendingFlush = false; + + constructor(private readonly storage: LogStorageService) { + this.client = this.storage.getClient(); + this.maxBatchSize = Math.max( + 1, + Number.parseInt(process.env.BULK_BATCH_SIZE ?? "1000", 10), + ); + const byteLimitMb = Number.parseFloat( + process.env.BULK_BATCH_BYTES_MB ?? "10", + ); + this.maxBatchBytes = Math.max(1024, Math.floor(byteLimitMb * 1024 * 1024)); + this.flushIntervalMs = Math.max( + 100, + Number.parseInt(process.env.BULK_FLUSH_INTERVAL_MS ?? "1000", 10), + ); + this.maxParallelFlushes = Math.max( + 1, + Number.parseInt(process.env.BULK_MAX_PARALLEL_FLUSHES ?? "3", 10), + ); + } + + /** + * Bulk 버퍼에 문서를 추가하고 조건을 만족하면 즉시 플러시한다. + * - flush 완료를 기다리지 않으므로 Kafka 컨슈머가 block 되지 않는다. + */ + enqueue(streamKey: LogStreamKey, document: BaseApmDocument): void { + const indexName = this.storage.getDataStream(streamKey); + const size = + Buffer.byteLength(JSON.stringify({ index: { _index: indexName } })) + + Buffer.byteLength(JSON.stringify(document)) + + 2; + + this.buffer.push({ index: indexName, document, size }); + this.bufferedBytes += size; + if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else { + this.ensureFlushTimer(); + } + } + + async onModuleDestroy(): Promise { + if (this.buffer.length > 0) { + await this.flushRemaining(); + } + } + + /** + * 버퍼가 비어있지 않다면 즉시 flush를 수행한다. + */ + /** + * 버퍼가 차거나 타이머가 만료되었을 때 실제 bulk 요청을 트리거한다. + * 동시 플러시 한도에 걸리면 pending 플래그만 세우고 이후에 재시도한다. + */ + private triggerFlush(): void { + if (this.buffer.length === 0) { + return; + } + if (this.inFlightFlushes >= this.maxParallelFlushes) { + this.pendingFlush = true; + return; + } + + const batch = this.drainBuffer(); + if (batch.length === 0) { + return; + } + + this.inFlightFlushes += 1; + void this.executeFlush(batch).finally(() => { + this.inFlightFlushes -= 1; + if (this.pendingFlush) { + this.pendingFlush = false; + this.triggerFlush(); + } else if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else if (this.buffer.length > 0) { + this.ensureFlushTimer(); + } + }); + } + + /** + * 버퍼 건수 혹은 바이트 수가 임계값을 초과했는지 검사한다. + */ + private shouldFlushBySize(): boolean { + return ( + this.buffer.length >= this.maxBatchSize || + this.bufferedBytes >= this.maxBatchBytes + ); + } + + /** + * 일정 시간 동안 문서가 적게 들어오는 경우를 대비해 타이머 기반 플러시를 예약한다. + */ + private ensureFlushTimer(): void { + if (this.flushTimer || this.buffer.length === 0) { + return; + } + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.triggerFlush(); + }, this.flushIntervalMs); + } + + /** + * 현재 버퍼를 비우고 배치로 반환한다. 플러시 타이머도 함께 초기화한다. + */ + private drainBuffer(): BufferedItem[] { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + const batch = this.buffer; + this.buffer = []; + this.bufferedBytes = 0; + return batch; + } + + /** + * 실제 Elasticsearch `_bulk` 호출을 수행하고, 각 문서의 Promise를 resolve/reject 한다. + */ + private async executeFlush(batch: BufferedItem[]): Promise { + const operations = this.buildOperations(batch); + try { + const response = await this.client.bulk({ operations }); + if (response.errors) { + this.logBulkError(response); + this.logger.warn( + `Bulk 색인 중 일부 문서가 실패했습니다. batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } else { + this.logger.debug( + `Bulk 색인 완료 batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } + } catch (error) { + const wrapped = + error instanceof Error + ? error + : new Error(`Bulk 색인 실패: ${String(error)}`); + this.logger.warn( + "Bulk 색인 요청이 실패했습니다. Kafka 컨슈머는 메시지를 계속 처리합니다.", + wrapped instanceof Error ? wrapped.stack : String(wrapped), + ); + } + } + + /** + * Bulk API가 요구하는 `create`/`document` 쌍의 operations 배열을 생성한다. + */ + private buildOperations( + batch: BufferedItem[], + ): Array> { + const operations: Array> = []; + for (const item of batch) { + // 데이터 스트림은 create op만 허용하므로 bulk 액션을 create로 지정한다. + operations.push({ create: { _index: item.index } }); + operations.push({ ...item.document }); + } + return operations; + } + + /** + * `_bulk` 응답 내 첫 번째 에러 내용을 추출하여 로그로 남긴다. + */ + private logBulkError(response: { items?: Array> }): void { + const firstError = response.items + ?.map((item) => Object.values(item)[0]) + .find((result) => result && result.error); + if (firstError) { + this.logger.error( + `Bulk 색인 실패: type=${firstError.error?.type} reason=${firstError.error?.reason}`, + ); + } else { + this.logger.error("Bulk 색인 실패: 응답 내 오류 세부 정보 없음"); + } + } + + /** + * 프로세스 종료 시 남은 버퍼/플러시가 모두 끝날 때까지 기다린다. + */ + private async flushRemaining(): Promise { + while (this.buffer.length > 0 || this.inFlightFlushes > 0) { + if (this.buffer.length > 0) { + this.triggerFlush(); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } + } +} diff --git a/backend/src/stream-processor/common/bulk-ingest.module.ts b/backend/src/stream-processor/common/bulk-ingest.module.ts new file mode 100644 index 0000000..34a4926 --- /dev/null +++ b/backend/src/stream-processor/common/bulk-ingest.module.ts @@ -0,0 +1,14 @@ +import { Global, Module } from "@nestjs/common"; +import { ApmInfrastructureModule } from "../../shared/apm/apm.module"; +import { BulkIndexerService } from "./bulk-indexer.service"; + +/** + * Bulk 인덱싱 서비스를 전역으로 제공하는 모듈 + */ +@Global() +@Module({ + imports: [ApmInfrastructureModule], + providers: [BulkIndexerService], + exports: [BulkIndexerService], +}) +export class BulkIngestModule {} diff --git a/backend/src/stream-processor/log-consumer/log-consumer.controller.ts b/backend/src/stream-processor/log-consumer/log-consumer.controller.ts index 1405236..5f166bf 100644 --- a/backend/src/stream-processor/log-consumer/log-consumer.controller.ts +++ b/backend/src/stream-processor/log-consumer/log-consumer.controller.ts @@ -40,7 +40,7 @@ export class LogConsumerController { try { const dto = this.parsePayload(value); - await this.logIngestService.ingest(dto); + this.logIngestService.ingest(dto); await this.errorLogForwarder.forward(dto); this.throughputTracker.markProcessed(); this.logger.debug( diff --git a/backend/src/stream-processor/sample-producer/send-sample-span.ts b/backend/src/stream-processor/sample-producer/send-sample-span.ts index 98d53ce..f3ba624 100644 --- a/backend/src/stream-processor/sample-producer/send-sample-span.ts +++ b/backend/src/stream-processor/sample-producer/send-sample-span.ts @@ -34,7 +34,7 @@ async function main(): Promise { parent_span_id: null, name: "GET /api/sample", kind: "SERVER", - duration_ms: 23.5, + duration_ms: 50.5, status: "OK", http_method: "GET", http_path: "/api/sample", diff --git a/backend/src/stream-processor/span-consumer/span-consumer.controller.ts b/backend/src/stream-processor/span-consumer/span-consumer.controller.ts index 2b341a8..ac1e0d3 100644 --- a/backend/src/stream-processor/span-consumer/span-consumer.controller.ts +++ b/backend/src/stream-processor/span-consumer/span-consumer.controller.ts @@ -27,7 +27,7 @@ export class SpanConsumerController { constructor(private readonly spanIngestService: SpanIngestService) {} @EventPattern(process.env.KAFKA_APM_SPAN_TOPIC ?? "apm.spans") - async handleSpanEvent(@Ctx() context: KafkaContext): Promise { + handleSpanEvent(@Ctx() context: KafkaContext): void { const value = context.getMessage().value; if (value == null) { this.logger.warn("Kafka 메시지에 본문이 없어 처리를 건너뜁니다."); @@ -36,7 +36,7 @@ export class SpanConsumerController { try { const dto = this.parsePayload(value); - await this.spanIngestService.ingest(dto); + this.spanIngestService.ingest(dto); this.throughputTracker.markProcessed(); this.logger.debug( `스팬이 색인되었습니다. topic=${context.getTopic()} partition=${context.getPartition()}`, diff --git a/k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md b/k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md deleted file mode 100644 index ba4ee0f..0000000 --- a/k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md +++ /dev/null @@ -1,97 +0,0 @@ -# Log Collector CPU 메트릭 수집 설정 - -쿠버네티스의 **log-collector Pod (3001번 포트 서버)** CPU 사용량을 수집합니다. - ---- - -## 로컬에서 테스트하는 방법 - -### 1. 사전 준비 -- Docker Desktop 실행 -- kind 클러스터 실행 중 -- Kafka/Elasticsearch/Redis 실행 중 (`cd infra && docker-compose up -d kafka elasticsearch redis`) - -### 2. FluentBit 설정 적용 - -```bash -# 1. 설정 배포 -cd k8s_userside_log_generator/k8s_http_to_flu_to_server -kubectl apply -f fluent-bit.yaml - -# 2. FluentBit 재시작 -kubectl rollout restart daemonset/fluent-bit -n default - -# 3. 재시작 완료 대기 (30초~1분) -kubectl rollout status daemonset/fluent-bit -n default -``` - -### 3. 백엔드 실행 - -```bash -cd backend -npm run start:dev:metrics-consumer -``` - -**30초마다 이런 로그가 나오면 성공:** -``` -[SYSTEM METRIC] service=log-collector pod=log-collector-xxx CPU=15.00% Memory=200Mi -``` - -### 4. 데이터 확인 - -```bash -# Query API 메트릭 엔드포인트 확인 -curl "http://localhost:3001/services/log-collector/metrics?metric=http_requests_total" -``` - ---- - -## test-log-collector-metrics.sh 파일 - -자동 테스트 스크립트입니다. 다음을 자동으로 확인합니다: -- FluentBit Pod 상태 -- log-collector Pod 상태 -- FluentBit 설정 -- Kafka 메시지 - -**사용법:** -```bash -cd k8s_userside_log_generator -./test-log-collector-metrics.sh -``` - ---- - -## 트러블슈팅 - -### "데이터가 안 들어와요" - -1. **log-collector Pod 확인** - ```bash - kubectl get pods | grep log-collector - ``` - -2. **FluentBit 재시작** - ```bash - kubectl rollout restart daemonset/fluent-bit -n default - ``` - -3. **30초 기다리기** (FluentBit이 30초마다 수집함) - -4. **Kafka 확인** - ```bash - docker exec panopticon-kafka /opt/kafka/bin/kafka-console-consumer.sh \ - --bootstrap-server localhost:9092 \ - --topic metrics.system \ - --max-messages 5 - ``` - ---- - -## 요약 - -1. `kubectl apply -f fluent-bit.yaml` - 설정 적용 -2. `kubectl rollout restart daemonset/fluent-bit` - 재시작 -3. `npm run start:dev:metrics-consumer` - 백엔드 실행 -4. 30초 기다리기 -5. Query API에서 확인 (`GET /services/{serviceName}/metrics`) diff --git a/k8s_userside_log_generator/README.md b/k8s_userside_log_generator/README.md deleted file mode 100644 index 578d508..0000000 --- a/k8s_userside_log_generator/README.md +++ /dev/null @@ -1,164 +0,0 @@ -# 테스트 소개 - -## 테스트 종류 - -## 1. k8s-http-to-flu-to-server - -해당 프로그램은 쿠버네티스 환경에서 **HTTP API 호출**을 통해 로그를 생성하고, -fluent-bit이 이를 감지하여 수집서버로 전달합니다. -**Ingress**를 통해 외부에서 API에 접근 가능하며, 실제 프로덕션 환경과 유사한 테스트입니다. - -### 사용법 - -#### 방법 1: 자동 배포 스크립트 (권장) - -```bash -cd log_collect_tests/k8s_http_to_flu_to_server -./deploy.sh -``` - -스크립트가 자동으로 다음을 수행합니다: - -- Kind 클러스터 확인 -- 기존 FluentBit 리소스 정리 (충돌 방지) -- Docker 이미지 빌드 -- Kind 클러스터에 이미지 로드 -- Kubernetes 리소스 배포 (Ingress 포함) -- 파드 준비 상태 대기 -- 사용법 안내 - -**리소스 삭제:** - -```bash -./cleanup.sh -``` - -**Kind 클러스터가 없는 경우:** - -```bash -# kind-config.yaml을 사용하여 클러스터 생성 (Ingress 설정 포함) -kind create cluster --config kind-config.yaml --name log-cluster - -# Ingress Controller 설치 (deploy.sh가 자동으로 확인 및 설치하지만, 수동 설치 시) -kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml - -# Ingress Controller 준비 확인 -kubectl wait --namespace ingress-nginx \ - --for=condition=ready pod \ - --selector=app.kubernetes.io/component=controller \ - --timeout=90s - -# control-plane에 설치되었는지 확인 -kubectl get pods -n ingress-nginx -o wide -``` - -#### 방법 2: 수동 배포 - -1. **Docker 이미지 빌드** - - ```bash - # 수집서버 이미지 빌드 - cd log_collect_tests/log_collect_server - docker build -t log-collector:latest . - - # 생성서버 이미지 빌드 - cd ../log_generator_server - docker build -t log-generator:latest . - ``` - -2. **Kind 클러스터에 이미지 로드** - - ```bash - kind load docker-image log-collector:latest --name log-cluster - kind load docker-image log-generator:latest --name log-cluster - ``` - -3. **Kubernetes 리소스 배포** - - ```bash - cd ../k8s_http_to_flu_to_server - - kubectl apply -f log-generator-deployment.yaml - kubectl apply -f log-collect-deployment.yaml - kubectl apply -f fluent-bit.yaml - kubectl apply -f ingress.yaml - ``` - -4. **배포 확인** - - - `kubectl get pods` - 모든 파드가 Running 상태인지 확인 - - `kubectl get services` - 서비스가 정상 생성되었는지 확인 - - `kubectl get ingress` - Ingress가 localhost로 매핑되었는지 확인 - -5. **테스트** - - ```bash - # API 호출 테스트 (단건) - curl http://localhost:8080/api/users/3 - - # 자동 로그 생성 (10회) - curl http://localhost:8080/api/autolog - ``` - -6. **로그 확인** - - ```bash - # 수집서버 로그 확인 (FluentBit이 전달한 로그 확인) - kubectl logs -l app=log-collector -f - - # 생성서버 로그 확인 (원본 로그 확인) - kubectl logs -l app=log-generator -f - ``` - -7. **정상 작동 확인** - - `curl http://localhost:8080/api/autolog` 호출 시 - - 생성서버에서 10번의 로그 출력 확인 - - 수집서버에서 FluentBit을 통해 전달된 로그 수신 확인 - -### 환경설정 - -- dockerDesktop → 환경설정 → kubernetes on -- **Kind 클러스터 요구사항:** - - `kind-config.yaml`에 Ingress 설정 포함: - ```yaml - nodes: - - role: control-plane - kubeadmConfigPatches: - - | - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - node-labels: "ingress-ready=true" - extraPortMappings: - - containerPort: 80 - hostPort: 8080 # localhost:8080으로 접근 - protocol: TCP - ``` - - Nginx Ingress Controller 설치 (deploy.sh가 자동 설치) - -### 유용한 디버깅 명령어 - -```bash -# 전체 리소스 확인 -kubectl get all - -# 파드 상태 상세 확인 -kubectl describe pod - -# 파드 로그 확인 (최근 100줄) -kubectl logs --tail=100 - -# 파드 실시간 로그 -kubectl logs -f - -# 레이블별 로그 확인 -kubectl logs -l app=log-generator -f - -# 이벤트 확인 (문제 진단) -kubectl get events --sort-by='.lastTimestamp' - -# 리소스 사용량 -kubectl top pods -``` - ---- diff --git a/k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh b/k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh deleted file mode 100755 index 7adec2a..0000000 --- a/k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -set -e - -echo "🧹 k8s-http-to-flu-to-server 리소스 삭제 시작" -echo "" - -# 현재 스크립트 위치 기준으로 경로 설정 -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -cd "$SCRIPT_DIR" - -echo "🗑️ Kubernetes 리소스 삭제 중..." - -kubectl delete -f ingress.yaml --ignore-not-found=true -kubectl delete -f fluent-bit.yaml --ignore-not-found=true -kubectl delete -f log-collect-deployment.yaml --ignore-not-found=true -kubectl delete -f log-generator-deployment.yaml --ignore-not-found=true - -echo "" -echo "✅ 모든 리소스 삭제 완료!" -echo "" - -# 남은 파드 확인 -REMAINING_PODS=$(kubectl get pods --no-headers 2>/dev/null | wc -l | tr -d ' ') - -if [ "$REMAINING_PODS" -gt 0 ]; then - echo "📊 남은 파드:" - kubectl get pods -else - echo "✅ 모든 파드가 삭제되었습니다." -fi - -echo "" diff --git a/k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh b/k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh deleted file mode 100755 index 66d6e65..0000000 --- a/k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh +++ /dev/null @@ -1,147 +0,0 @@ -#!/bin/bash -set -e - -echo "🚀 k8s-http-to-flu-to-server 배포 시작" -echo "📍 대상: Kind 클러스터 (log-cluster)" -echo "" - -# 현재 스크립트 위치 기준으로 경로 설정 -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" - -echo "📂 프로젝트 경로: $PROJECT_ROOT" -echo "" - -# Kind 클러스터 확인 -if ! kind get clusters | grep -q "log-cluster"; then - echo "❌ 'log-cluster' Kind 클러스터를 찾을 수 없습니다." - echo "" - echo "다음 명령어로 클러스터를 생성하세요:" - echo " kind create cluster --config $SCRIPT_DIR/kind-config.yaml --name log-cluster" - echo "" - exit 1 -fi - -echo "✅ Kind 클러스터 확인 완료" -echo "" - -# Kind 클러스터 context 설정 -kubectl config use-context kind-log-cluster > /dev/null 2>&1 - -# Ingress Controller 확인 및 설치 -echo "🔍 Ingress Controller 확인 중..." -if ! kubectl get pods -n ingress-nginx | grep -q "ingress-nginx-controller"; then - echo "📦 Ingress Controller 설치 중..." - kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml - - echo "⏳ Ingress Controller가 준비될 때까지 대기 중..." - kubectl wait --namespace ingress-nginx \ - --for=condition=ready pod \ - --selector=app.kubernetes.io/component=controller \ - --timeout=90s - - echo "✅ Ingress Controller 설치 완료" -else - echo "✅ Ingress Controller 이미 설치됨" - - # control-plane에서 실행 중인지 확인 - INGRESS_NODE=$(kubectl get pods -n ingress-nginx -o wide | grep ingress-nginx-controller | awk '{print $7}') - if [[ "$INGRESS_NODE" == *"control-plane"* ]]; then - echo " └─ control-plane에서 실행 중 ✓" - else - echo " └─ ⚠️ $INGRESS_NODE 에서 실행 중 (control-plane이 아님)" - fi -fi -echo "" - -# 1. 기존 FluentBit 삭제 (다른 테스트의 FluentBit 설정과 충돌 방지) -echo "🧹 기존 FluentBit 리소스 정리..." -kubectl delete daemonset fluent-bit --ignore-not-found=true -kubectl delete configmap fluent-bit-config --ignore-not-found=true -kubectl delete serviceaccount fluent-bit --ignore-not-found=true -kubectl delete clusterrole fluent-bit --ignore-not-found=true -kubectl delete clusterrolebinding fluent-bit --ignore-not-found=true -echo "✅ 기존 FluentBit 정리 완료" -echo "" - -# 2. Docker 이미지 빌드 -echo "🔨 Docker 이미지 빌드 중..." - -echo " - log-collector 이미지 빌드..." -cd "$PROJECT_ROOT/k8s_userside_log_generator/log_collect_server" -docker build -t log-collector:latest . -q - -echo " - log-generator 이미지 빌드..." -cd "$PROJECT_ROOT/k8s_userside_log_generator/log_generator_server" -docker build -t log-generator:latest . -q - -echo "✅ Docker 이미지 빌드 완료" -echo "" - -# 3. Kind 클러스터에 이미지 로드 -echo "📦 Kind 클러스터에 이미지 로드 중..." -kind load docker-image log-collector:latest --name log-cluster -kind load docker-image log-generator:latest --name log-cluster -echo "✅ 이미지 로드 완료" -echo "" - -# 4. Kubernetes 리소스 배포 -echo "☸️ Kubernetes 리소스 배포 중..." -cd "$SCRIPT_DIR" - -kubectl apply -f log-generator-deployment.yaml -kubectl apply -f log-collect-deployment.yaml -kubectl apply -f fluent-bit.yaml -kubectl apply -f ingress.yaml - -echo "✅ Kubernetes 리소스 배포 완료" -echo "" - -# 5. 배포 상태 확인 -echo "⏳ 파드가 준비될 때까지 대기 중..." -sleep 3 - -kubectl wait --for=condition=ready pod -l app=log-collector --timeout=60s -kubectl wait --for=condition=ready pod -l app=log-generator --timeout=60s -kubectl wait --for=condition=ready pod -l app=fluent-bit --timeout=60s - -echo "" -echo "✅ 모든 파드 준비 완료!" -echo "" - -# 6. 배포 확인 -echo "📊 배포 상태 확인:" -echo "" -echo "Pods:" -kubectl get pods -echo "" -echo "Services:" -kubectl get services -echo "" -echo "Ingress:" -kubectl get ingress -echo "" - -# 7. 사용법 안내 -echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -echo "✅ k8s-http-to-flu-to-server 배포 완료!" -echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -echo "" -echo "📝 테스트 방법:" -echo "" -echo "1. API 호출 테스트 (단건):" -echo " curl http://localhost:8080/api/users/3" -echo "" -echo "2. 자동 로그 생성 (10회):" -echo " curl http://localhost:8080/api/autolog" -echo "" -echo "3. 수집서버 로그 확인 (FluentBit이 전달한 로그):" -echo " kubectl logs -l app=log-collector -f" -echo "" -echo "4. 생성서버 로그 확인 (원본 로그):" -echo " kubectl logs -l app=log-generator -f" -echo "" -echo "5. FluentBit 로그 확인:" -echo " kubectl logs -l app=fluent-bit -f" -echo "" -echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" diff --git a/k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml b/k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml deleted file mode 100644 index 5b36e2d..0000000 --- a/k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml +++ /dev/null @@ -1,229 +0,0 @@ -apiVersion: v1 -kind: ServiceAccount -metadata: - name: fluent-bit ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: fluent-bit -rules: - - apiGroups: [""] - resources: - - namespaces - - pods - verbs: ["get", "list", "watch"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: fluent-bit -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: fluent-bit -subjects: - - kind: ServiceAccount - name: fluent-bit - namespace: default ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: fluent-bit-config -data: - fluent-bit.conf: | - [SERVICE] - Flush 5 - Daemon Off - Log_Level info - Parsers_File parsers.conf - - [INPUT] - Name tail - Path /var/log/containers/*ingress-nginx-controller*.log - Tag kube.ingress.* - Refresh_Interval 5 - Mem_Buf_Limit 5MB - Skip_Long_Lines Off - - [INPUT] - Name tail - Path /var/log/containers/*log-generator*.log - Tag kube.backend.* - Refresh_Interval 5 - Mem_Buf_Limit 5MB - Skip_Long_Lines Off - - # === Ingress 로그 파싱 === - - # 1단계: Docker 컨테이너 로그 형식 파싱 - [FILTER] - Name parser - Match kube.ingress.* - Key_Name log - Parser cri_log - Reserve_Data On - - # 2단계: Nginx access log 파싱 - [FILTER] - Name parser - Match kube.ingress.* - Key_Name message - Parser nginx_ingress - Reserve_Data On - - # 3단계: 필드 추가 - [FILTER] - Name modify - Match kube.ingress.* - Add log_type http - - # === Backend 로그 파싱 === - - # 1단계: Docker 컨테이너 로그 형식 파싱 - [FILTER] - Name parser - Match kube.backend.* - Key_Name log - Parser cri_log - Reserve_Data On - - # 2단계: JSON 내용 파싱 - [FILTER] - Name parser - Match kube.backend.* - Key_Name message - Parser json_log - Reserve_Data On - - # 3단계: log_type이 없는 로그에 application 추가 (필드가 없을 때만 추가됨) - [FILTER] - Name modify - Match kube.backend.* - Add log_type application - - # 4단계: log_type에 따라 태그 분리 - [FILTER] - Name rewrite_tag - Match kube.backend.* - Rule $log_type ^metrics$ kube.metrics false - Rule $log_type ^application$ kube.app false - Emitter_Name re_emitted - - # 4단계: log-collector 필드 추가 - [FILTER] - Name modify - Match metrics.system.log-collector - Add service log-collector - Add namespace default - Add nodeName kind-control-plane - - # === 공통 정리 === - - # 불필요한 필드 제거 - [FILTER] - Name record_modifier - Match * - # Remove_key log - # Remove_key message - # Remove_key stream - Remove_key logtag - # Remove_key time - Remove_key _p - - # === Kafka 출력 === - - [OUTPUT] - Name kafka - Match kube.ingress.* - Brokers 172.21.100.253:9092 - Topics logs.http - Format json - - [OUTPUT] - Name kafka - Match kube.app - Brokers 172.21.100.253:9092 - Topics logs.app - Format json - - [OUTPUT] - Name kafka - Match kube.metrics - Brokers 172.21.100.253:9092 - Topics logs.metric - Format json - - parsers.conf: | - # CRI/Kubernetes 컨테이너 로그 형식 파서 - # 형식: - # 예: 2025-11-03T17:19:58.353137053Z stdout F actual log content here - [PARSER] - Name cri_log - Format regex - Regex ^(?