From a583410a6d7e1650927eae032e3ea5a1507621a9 Mon Sep 17 00:00:00 2001 From: jeonseyeong Date: Tue, 18 Nov 2025 02:15:19 +0900 Subject: [PATCH 1/8] =?UTF-8?q?=EC=9D=B8=EB=8D=B1=EC=8B=B1=EB=B0=B0?= =?UTF-8?q?=EC=B9=98=EC=B2=98=EB=A6=AC=EB=A1=9C=EB=A6=AC=ED=8C=A9=ED=86=A0?= =?UTF-8?q?=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AGENTS.md | 2 +- backend/README.md | 11 + backend/apm_bulk_design.md | 198 ++++++++++++ backend/rollup_metrics_spec.md | 34 +- .../src/notification/notification.module.ts | 8 - .../src/notification/notification.service.ts | 88 ------ .../apm/log-ingest/log-ingest.module.ts | 3 +- .../apm/log-ingest/log-ingest.service.ts | 10 +- .../apm/span-ingest/span-ingest.module.ts | 3 +- .../apm/span-ingest/span-ingest.service.ts | 10 +- .../common/bulk-indexer.service.ts | 210 +++++++++++++ .../common/bulk-ingest.module.ts | 14 + .../sample-producer/send-sample-span.ts | 2 +- .../LOG_COLLECTOR_METRICS_SETUP.md | 97 ------ k8s_userside_log_generator/README.md | 164 ---------- .../k8s_http_to_flu_to_server/cleanup.sh | 33 -- .../k8s_http_to_flu_to_server/deploy.sh | 147 --------- .../k8s_http_to_flu_to_server/fluent-bit.yaml | 229 -------------- .../k8s_http_to_flu_to_server/ingress.yaml | 18 -- .../kind-config.yaml | 20 -- .../log-collect-deployment.yaml | 46 --- .../log-generator-deployment.yaml | 38 --- .../log_collect_server/.dockerignore | 8 - .../log_collect_server/.gitignore | 5 - .../log_collect_server/Dockerfile | 18 -- .../log_collect_server/index.js | 51 --- .../log_collect_server/package.json | 20 -- .../log_generator_server/.dockerignore | 8 - .../log_generator_server/.gitignore | 5 - .../log_generator_server/Dockerfile | 18 -- .../log_generator_server/index.js | 296 ------------------ .../log_generator_server/package.json | 23 -- 32 files changed, 469 insertions(+), 1368 deletions(-) create mode 100644 backend/apm_bulk_design.md delete mode 100644 backend/src/notification/notification.module.ts delete mode 100644 backend/src/notification/notification.service.ts create mode 100644 backend/src/stream-processor/common/bulk-indexer.service.ts create mode 100644 backend/src/stream-processor/common/bulk-ingest.module.ts delete mode 100644 k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md delete mode 100644 k8s_userside_log_generator/README.md delete mode 100755 k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh delete mode 100755 k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh delete mode 100644 k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml delete mode 100644 k8s_userside_log_generator/k8s_http_to_flu_to_server/ingress.yaml delete mode 100644 k8s_userside_log_generator/k8s_http_to_flu_to_server/kind-config.yaml delete mode 100644 k8s_userside_log_generator/k8s_http_to_flu_to_server/log-collect-deployment.yaml delete mode 100644 k8s_userside_log_generator/k8s_http_to_flu_to_server/log-generator-deployment.yaml delete mode 100644 k8s_userside_log_generator/log_collect_server/.dockerignore delete mode 100644 k8s_userside_log_generator/log_collect_server/.gitignore delete mode 100644 k8s_userside_log_generator/log_collect_server/Dockerfile delete mode 100644 k8s_userside_log_generator/log_collect_server/index.js delete mode 100644 k8s_userside_log_generator/log_collect_server/package.json delete mode 100644 k8s_userside_log_generator/log_generator_server/.dockerignore delete mode 100644 k8s_userside_log_generator/log_generator_server/.gitignore delete mode 100644 k8s_userside_log_generator/log_generator_server/Dockerfile delete mode 100644 k8s_userside_log_generator/log_generator_server/index.js delete mode 100644 k8s_userside_log_generator/log_generator_server/package.json diff --git a/AGENTS.md b/AGENTS.md index e52a796..a71bb27 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,7 +1,7 @@ # Repository Guidelines ## Project Structure & Module Organization -모노레포 루트에는 `backend`(주요 NestJS 모노앱), `producerServer`, `app_api`, `infra`, `k8s_userside_log_generator` 가 있습니다. 일상 개발은 `backend` 디렉터리에서 수행하며 `src/query-api`(읽기 전용 API), `src/stream-processor`(카프카 소비자), `src/shared`(공용 DTO·서비스)로 나뉩니다. 런타임 산출물은 `dist/`에 생성되고, 환경 전용 설정은 `.env`, `.env.local`, `infra/docker-compose.yml` 등에 위치합니다. 테스트 스위트는 `src/**/*.spec.ts`에 두고, 샘플 수집 스크립트는 `src/stream-processor/log-consumer/app` 하위에 있습니다. +모노레포 루트에는 `backend`(주요 NestJS 모노앱), `producerServer`, `app_api`, `infra` 가 있습니다. 일상 개발은 `backend` 디렉터리에서 수행하며 `src/query-api`(읽기 전용 API), `src/stream-processor`(카프카 소비자), `src/shared`(공용 DTO·서비스)로 나뉩니다. 런타임 산출물은 `dist/`에 생성되고, 환경 전용 설정은 `.env`, `.env.local`, `infra/docker-compose.yml` 등에 위치합니다. 테스트 스위트는 `src/**/*.spec.ts`에 두고, 샘플 수집 스크립트는 `src/stream-processor/log-consumer/app` 하위에 있습니다. ## Build, Test, and Development Commands 모든 명령은 `backend`에서 실행합니다. `npm run start:query-api`, `npm run start:log-consumer`, `npm run start:metrics-consumer` 는 각각의 Nest 엔트리를 ts-node로 부팅합니다. 프로덕션 빌드는 `npm run build` 또는 서비스별 `npm run build:query-api`, `npm run build:stream-processor` 로 생성합니다. 로컬 통합 확인 시 `docker compose -f infra/docker-compose.yml up kafka elasticsearch redis` 로 인프라를 띄운 뒤 애플리케이션을 npm 스크립트로 실행합니다. 샘플 이벤트는 `npm run test:app-log`·`npm run test:http-log` 로 발행합니다. diff --git a/backend/README.md b/backend/README.md index f0c0b4c..f8b3a1a 100644 --- a/backend/README.md +++ b/backend/README.md @@ -67,6 +67,17 @@ Kafka 컨슈머 처리량을 주기적으로 파악하고 싶다면 다음 환 | `STREAM_THROUGHPUT_MIN_INTERVAL_MS` | `10000` | 처리량 로그 사이의 최소 간격(ms). 너무 잦은 로깅을 방지합니다. | | `STREAM_THROUGHPUT_TARGET_COUNT` | _(옵션)_ | 총 N건 처리 완료까지의 예상 소요 시간을 로그에 함께 표시합니다. | +### Bulk 색인 버퍼 옵션 + +`apm.logs`/`apm.spans` 컨슈머는 Elasticsearch `_bulk` API로 배치 색인을 수행합니다. 아래 환경 변수를 통해 버퍼 크기와 플러시 동시성을 조정할 수 있습니다. + +| 변수 | 기본값 | 설명 | +| --- | --- | --- | +| `BULK_BATCH_SIZE` | `500` | 버퍼에 일정 건수 이상 쌓이면 즉시 flush 합니다. | +| `BULK_BATCH_BYTES_MB` | `5` | 문서 크기 합계가 지정한 MB를 넘기면 즉시 flush 합니다. | +| `BULK_FLUSH_INTERVAL_MS` | `1000` | 위 조건을 만족하지 않아도 해당 시간이 지나면 주기적으로 flush 합니다. | +| `BULK_MAX_PARALLEL_FLUSHES` | `1` | 동시에 실행할 bulk 요청 개수. 클러스터 부하에 맞게 1~4 사이에서 조정하세요. | + ### Query API 성능 프로파일링 서비스 메트릭 엔드포인트(`GET /services/{serviceName}/metrics`)가 Elasticsearch 집계를 수행하는데 걸린 시간을 확인하려면 `SERVICE_METRICS_PROFILE=true`를 설정하면 됩니다. diff --git a/backend/apm_bulk_design.md b/backend/apm_bulk_design.md new file mode 100644 index 0000000..7c61c46 --- /dev/null +++ b/backend/apm_bulk_design.md @@ -0,0 +1,198 @@ +# Design Specification: Bulk + Limited Concurrency Ingestion for Elasticsearch + +## 배경과 기존 문제 + +현행 APM 파이프라인은 Kafka에서 로그 메시지를 하나씩 읽어와 `index()` API로 Elasticsearch(E) 데이터스트림에 저장한다. 네트워크 왕복을 매 메시지마다 수행하고, 단건 `index()`가 끝날 때까지 기다린 뒤 다음 메시지를 처리하는 구조 때문에 초당 처리량이 낮고 병렬성이 사실상 없었다. 특히 수십만 건 이상의 로그를 처리할 때 소비자 스레드가 몇 시간 동안 대기하는 현상이 발생하였다. Elastic API 문서는 대규모 데이터 적재 시 단건 `index()` 대신 Bulk API를 사용해야 네트워크 오버헤드를 줄이고 색인 속도를 높일 수 있다고 강조한다【102369654029175†L753-L762】. 그리고 적절한 배치 크기를 찾기 위해 실험이 필요하며, HTTP 요청은 100 MB를 넘지 않도록 주의해야 한다【102369654029175†L753-L757】. 또한 다른 사용자들도 네트워크 왕복을 아끼기 위해 Bulk API를 병렬로 실행하면서 적당한 청크 크기(1 MB–10 MB)와 적정 동시 플러시 수를 찾아야 한다고 조언한다【750595237790637†L52-L61】. + +본 문서는 이러한 문제를 해결하고, **대량 로그(수십만~수백만 건)도 빠르게 색인하면서 at‑least‑once 보장**을 유지하는 아키텍처를 설계한다. Node (NestJS) + Kafka + Elasticsearch 환경을 대상으로 하지만, 설계 개념은 다른 언어와 프레임워크에도 적용 가능하다. + +## 목표 + +- Kafka에서 들어오는 로그를 **at‑least‑once**로 Elasticsearch에 저장하고, 색인 실패 시 재처리를 지원한다. +- **Bulk API**를 사용하여 단건 요청 대비 색인 속도를 크게 높인다. Medium 기사에서도 bulk indexing이 단건 저장보다 훨씬 빠르다고 강조한다【85091753486274†L114-L120】. Python 테스트에서도 1000개 문서를 단건으로 index 하는 데 124 초가 걸린 반면, `streaming_bulk`는 0.67 초, `parallel_bulk`는 0.46 초만에 끝났다【596974854923973†L30-L124】. +- 과도한 동시성으로 Elasticsearch 클러스터를 과부하하지 않도록 **동시 플러시 수를 제한**한다. 실제 사례에서도 32개의 동시 프로세스로 색인하던 것을 1개 프로세스로 줄이자 클러스터 지표가 크게 개선되었다【901591924758741†L125-L143】. +- Kafka consumer가 색인을 기다리지 않고 무제한으로 enqueue하여 메모리를 고갈시키는 일을 방지하기 위해 **back‑pressure** 메커니즘을 도입한다. +- 로그 데이터 변환/검증 과정과 색인 과정의 책임을 분리하여 재사용성을 높이고, 서비스 코드와 분리된 Bulk 인덱싱 모듈을 제공한다. + +## 아키텍처 개요 + +### 데이터 흐름 + +``` +Kafka (topic: apm.logs) ──> NestJS consumer ──> DTO 변환/검증 ──> BulkIndexer (버퍼/플러시) ──> Elasticsearch 데이터스트림 +``` + +![Bulk ingestion flow]({{file:file-3hAzDofY4oYDp2MfohfxJ2}}) + +1. **Kafka consumer**는 메시지를 받아 DTO 변환 및 필수 검증을 수행한다. 검증에서 오류가 발생하면 해당 메시지를 건너뛰고 Kafka offset을 커밋한다. +2. DTO를 Elasticsearch 문서 구조(`LogDocument`)로 매핑한 뒤 **BulkIndexer**에 enqueue 한다. 이 단계에서는 실제 색인 작업을 수행하지 않고, 메모리 버퍼에 문서와 offset 정보를 저장한다. enqueue 호출은 `Promise`를 반환하며, 문서가 색인되면 resolve되고 실패 시 reject된다. +3. **BulkIndexer**는 배치 크기 또는 시간 조건이 만족될 때 버퍼에 쌓인 문서를 `_bulk` API로 전송한다. 클라이언트는 `Content‑Type: application/x‑ndjson`를 사용하여 NDJSON 포맷으로 전송해야 한다는 Elastic 문서를 준수한다【102369654029175†L738-L752】. +4. Flush 결과를 보고 개별 문서의 `resolve/reject`를 호출한다. 한 아이템이라도 실패하면 단순화된 전략으로 모든 문서를 실패로 취급하여 Kafka 재처리를 유도한다. 이로써 at‑least‑once 보장을 유지하지만 중복 삽입 가능성을 인정한다. +5. Consumer handler는 enqueue 반환값을 `await`하므로, 문서가 성공적으로 색인돼야만 offset을 커밋한다. 실패 시 예외를 발생시켜 Kafka consumer가 재시도하게 한다. + +### BulkIndexer 버퍼 설계 + +BulkIndexer는 싱글톤 서비스로서 다음과 같은 상태를 가진다: + +- `buffer: Array` — 각 항목은 `document`, Kafka offset 메타데이터, `resolve`/`reject` 콜백을 포함한다. +- `flushTimer: NodeJS.Timeout | null` — 시간 기반 플러시 예약용 타이머. +- `inFlightFlushes: number` — 현재 진행 중인 flush의 수. `MAX_PARALLEL_FLUSHES` 한도를 초과하면 새 flush를 미뤄서 클러스터 과부하를 방지한다. + +#### enqueue 동작 + +`enqueue(document, offsetInfo): Promise`는 다음과 같이 동작한다. + +1. `Promise`를 생성하고, `resolve`/`reject`를 `BufferedItem`에 저장한다. +2. `buffer`에 아이템을 추가한다. +3. 버퍼 길이가 `BATCH_SIZE` 이상이면 즉시 `flush()`를 시도한다. `flush()`는 `inFlightFlushes < MAX_PARALLEL_FLUSHES`일 때만 실행되며, 그렇지 않으면 다음 enqueue 또는 timer에서 재시도한다. +4. `flushTimer`가 설정되어 있지 않으면 `setTimeout(flush, FLUSH_INTERVAL_MS)`를 등록하여 일정 시간이 지나도 남아있는 문서를 flush할 수 있게 한다. +5. `Promise`를 반환하여 consumer handler가 `await`할 수 있도록 한다. + +#### flush 동작 + +`flush()`는 버퍼에서 현재 쌓여있는 항목을 모두 꺼내 하나의 bulk 요청으로 전송한다. + +- 메모리 버퍼는 매 flush 후 비워진다. 다른 메시지가 들어오면 새 버퍼에서 다시 쌓인다. +- `_bulk` 요청은 NDJSON 형식으로 `{ "index" : {"_index": data_stream} }\n{document}\n` 패턴으로 구성한다. +- HTTP 요청은 100 MB를 넘지 않도록 해야 한다는 Elastic 문서를 준수한다【102369654029175†L753-L757】. 배치 크기 기준은 문서 수(`BATCH_SIZE`) 또는 추정 바이트 수(예: 5 MB)를 동시에 고려할 수 있다. +- 요청 결과에서 `errors` 필드가 `true`이면 **간단한 전략으로 전체 배치를 실패로 간주**한다. Jörg Prante의 조언처럼 하나의 bulk 내 아이템들이 실패하면 동시에 재시도하는 것이 구현을 단순화한다【750595237790637†L52-L61】. +- 성공 시 각 아이템의 `resolve()`를 호출하여 consumer handler가 정상적으로 offset을 커밋할 수 있도록 한다. 실패 시 `reject()`를 호출하여 예외를 propagate 하고 Kafka에서 재처리한다. + +#### 플러시 조건 + +- `BATCH_SIZE` (문서 개수 기준): 500 ~ 1000개부터 시작하여 환경에 따라 조정한다. Elastic 커뮤니티에서는 1 MB ~ 10 MB 정도의 bulk 크기를 권장하며, 너무 큰 청크(예: 64 MB 이상)는 GC나 heap 부하를 유발할 수 있다고 한다【750595237790637†L52-L61】. +- `FLUSH_INTERVAL_MS` (시간 기준): 배치 크기에 도달하지 않더라도 일정 시간(예: 1000 ms) 이상 버퍼에 머무는 문서가 있으면 flush한다. 낮은 트래픽 환경에서 문서가 너무 오래 지연되는 것을 막는다. +- `MAX_PARALLEL_FLUSHES`: 동시에 몇 개의 bulk 요청을 허용할지 제어한다. 값이 1이면 한 번에 하나만 flush하고, 나머지는 버퍼에 대기한다. 높은 동시성을 주려면 2 ~ 4까지 늘리고, 클러스터에서 429 (too many requests) 에러가 발생하면 줄인다【85091753486274†L118-L129】. + +### Kafka offset 커밋 전략 + +NestJS Kafka 마이크로서비스는 handler가 `Promise`를 반환하고 예외를 발생시키는지 여부를 기준으로 offset을 커밋하거나 재처리한다. 따라서 bulk flush 결과가 성공했을 때만 enqueue에서 반환된 `Promise`를 resolve하고, 메시지 처리 함수(`handleLogEvent`)는 `await`를 통해 색인 완료를 보장해야 한다. 실패 시 reject하여 예외가 전파되면 offset이 커밋되지 않고 Kafka가 재전송한다. 이 방식은 **at‑least‑once** 보장을 유지하는 동시에 기본 NestJS API를 변경하지 않아 구현이 간단하다. + +### 오류 처리 전략 + +1. **DTO 검증 오류**: 현재 구조처럼 `InvalidLogEventError`를 구분하여 메시지를 skip 하고 성공적으로 offset을 커밋한다. 잘못된 이벤트는 재시도해도 성공할 가능성이 없으므로 유실을 허용한다. +2. **bulk 전체 실패(네트워크 오류 등)**: flush한 모든 문서를 실패로 간주하고 각 `reject`를 호출한다. Kafka에서는 재처리되어 중복 색인이 발생할 수 있으나 로그 데이터는 idempotent하지 않아도 되므로 허용한다. +3. **bulk 부분 실패**: Elasticsearch 응답에서 `errors`가 `true`일 때 성공/실패 항목을 따로 구분할 수 있다. 구현이 복잡해지므로 초기에는 **전체 배치를 실패**로 간주하는 단순 전략을 사용한다. 향후 요구 사항에 따라 실패한 문서만 재처리하거나 별도의 DLQ(Dead Letter Queue)를 도입할 수 있다. + +### 동시성 및 back‑pressure + +노드 프로세스는 이벤트 루프를 사용하므로 무제한으로 I/O를 발행할 경우 메모리나 소켓 수가 고갈될 수 있다. 따라서 BulkIndexer는 `MAX_PARALLEL_FLUSHES`를 통해 한 번에 수행할 flush 수를 제한한다. 버퍼에 데이터가 넘치면 Kafka consumer는 자연스럽게 `await enqueue` 호출에서 대기하게 되어 back‑pressure가 형성된다. 이를 통해 consumer가 처리 가능한 속도 이상으로 메시지를 읽어오지 않게 된다. 실제 사례에서도 색인 동시성을 32에서 1로 줄이자 클러스터 CPU와 검색 지연이 크게 줄었다【901591924758741†L125-L143】. + +#### 다중 작업자(Workers) 사용과 429 대응 + +Elastic Docs는 단일 스레드가 bulk 요청을 보내서는 클러스터 자원을 모두 활용할 수 없다고 설명한다【389495691530842†L924-L931】. 따라서 여러 스레드나 프로세스를 사용해 동시적으로 bulk 요청을 보내면 I/O 비용을 나누고 전체 처리량을 높일 수 있다. 그러나 너무 많은 병렬 작업자는 클러스터의 메모리와 CPU를 고갈시켜 `TOO_MANY_REQUESTS (429)` 오류를 초래할 수 있다【389495691530842†L932-L941】. 권장 사항은 다음과 같다: + +* **복수 작업자/스레드 사용**: 한 스레드만으로는 클러스터를 포화시키기 어렵다. 여러 작업자를 통해 병렬로 bulk 요청을 전송해 전체 throughput을 높인다【389495691530842†L924-L931】. +* **적정 동시성 찾기**: 적정한 작업자 수는 환경마다 다르므로 실험적으로 찾아야 한다. 한 번에 작업자 수를 조금씩 늘리면서 CPU나 디스크 I/O가 포화되는 지점을 확인한다【389495691530842†L943-L945】. +* **429 오류 감지와 백오프**: 클러스터가 과부하되면 `EsRejectedExecutionException`을 통해 429 코드를 반환한다【389495691530842†L937-L941】. 이때는 indexer가 잠시 대기 후 재시도하는 지수 백오프 로직을 적용해야 한다【389495691530842†L937-L941】. 앞서 언급한 Medium 기사에서도 429 오류가 발생할 때는 임의의 지연으로 확장 지수 백오프를 쓰라고 권장한다【85091753486274†L118-L129】. + +이러한 지침을 따라 `MAX_PARALLEL_FLUSHES`와 Kafka consumer 인스턴스 수를 조정하면 클러스터 리소스를 적절히 활용할 수 있다. 너무 적으면 쓰기 throughput이 낮아지고, 너무 많으면 429 오류와 지연이 발생한다. 따라서 모니터링 도구를 활용해 적절한 지점을 찾아야 한다. + +## Elasticsearch 설정 가이드 + +Bulk 인덱싱 성능은 클라이언트 설계뿐 아니라 Elasticsearch 클러스터 설정에도 크게 좌우된다. + +### Bulk API와 배치 크기 + +- Elastic 문서는 bulk 요청에 정답이 되는 문서 수가 없으며, 시스템 환경에 맞는 최적값을 찾아야 한다고 설명한다【102369654029175†L753-L755】. 요청의 크기는 100 MB 이하로 제한되어야 한다【102369654029175†L753-L757】. +- Elastic 커뮤니티의 조언에 따르면, 1 MB~10 MB 또는 500 ~ 1000개의 문서를 한 배치로 보내는 것이 일반적이다【750595237790637†L52-L61】. 너무 큰 배치(수십 MB 이상)는 GC와 heap 부하를 유발하고, 너무 작은 배치는 네트워크 왕복이 많아진다. + +### Refresh interval 조정 + +- 색인된 문서를 검색에서 볼 수 있도록 만드는 **refresh** 작업은 비용이 크며, 지나치게 자주 호출하면 색인 속도가 저하된다. Elastic Docs는 대용량 bulk 작업을 수행할 때 `index.refresh_interval`을 `-1`로 설정하여 refresh를 비활성화한 뒤, 작업 완료 후 다시 원하는 값으로 설정할 것을 권장한다【389495691530842†L947-L986】. refresh를 비활성화하면 색인 중에는 문서가 검색되지 않지만, 로그 파이프라인은 약간의 지연을 허용하므로 적합하다. +- 기본값(1 초 refresh)은 색인량이 적고 검색량이 적을 때 최적이지만, 정기적인 검색 요청이 있는 경우 refresh interval을 30 초로 늘리면 색인 성능이 개선될 수 있다【389495691530842†L953-L966】. + +### Replica 비활성화 + +- 대량 초기 적재 시 `index.number_of_replicas`를 0으로 설정하여 색인 성능을 높일 수 있다. Elastic Docs는 초기 로드를 빠르게 끝낸 뒤 다시 원래 값으로 되돌릴 것을 권장한다【389495691530842†L1011-L1018】. 다만 replica를 0으로 설정하면 노드 장애 시 데이터 손실 위험이 있으므로 외부 저장소나 Kafka에 데이터가 안전하게 남아있어야 한다. + +### 기타 튜닝 사항 + +- **index buffer size**와 **translog flush threshold**: 노드가 heavy indexing만 수행한다면 `indices.memory.index_buffer_size`를 충분히 크게(예: 최소 512 MB / shard) 설정하여 버퍼가 너무 자주 flush되지 않도록 한다【85091753486274†L241-L266】. translog의 `flush_threshold_size`를 늘려서 디스크 flush 빈도를 줄이면 성능이 향상될 수 있다【85091753486274†L241-L266】. +- **Auto‑generated IDs**: 자체 아이디를 지정하면 Elasticsearch가 기존 문서를 조회하여 중복 여부를 확인해야 하므로 느려질 수 있다. Elastic Docs는 auto‑generated id를 사용할 때 색인 성능이 향상된다고 설명한다【389495691530842†L1054-L1060】. 하지만 로그 데이터는 추적을 위해 trace id나 timestamp를 key로 사용할 수도 있으며, 이 경우 id 조회 비용을 감수하거나 idempotent 파이프라인을 설계해야 한다. +- **Refresh interval과 replica 변경 이후 복구**: bulk 적재가 끝나면 refresh interval을 원래 값으로, replica를 원래 개수로 되돌린 뒤 필요하다면 `_forcemerge`를 수행하여 검색 성능을 최적화한다【389495691530842†L947-L1003】. + +## Node / NestJS 구현 지침 + +### BulkIndexer 서비스 인터페이스 + +```ts +interface OffsetInfo { + topic: string; + partition: number; + offset: string; +} + +interface BulkIndexer { + enqueue(document: LogDocument, offsetInfo: OffsetInfo): Promise; + flush(): Promise; +} +``` + +BulkIndexer를 NestJS의 provider로 등록하여 전체 프로세스에서 하나의 인스턴스를 사용한다. `enqueue()`는 위에서 설명한 버퍼 로직을 구현하고, `flush()`는 현재 버퍼를 `_bulk` API로 전송한다. flush는 `MAX_PARALLEL_FLUSHES`를 넘지 않는 범위에서 실행된다. + +### 서비스 연동 예시 흐름 + +1. **컨슈머** (`handleLogEvent`): DTO 파싱, 검증 후 `logIngestService.ingest(dto)`를 호출한다. +2. **logIngestService**: DTO를 Elasticsearch 문서로 변환하고 `bulkIndexer.enqueue(document, offsetInfo)`를 호출한다. 반환되는 `Promise`를 `await`하여 색인 완료를 기다린다. +3. **BulkIndexer**: 내부 버퍼에 데이터와 offset을 저장하고 플러시 조건을 점검한다. +4. **Kafka consumer config**: `eachMessage`/`handleLogEvent`에서 발생한 예외는 NestJS Kafka가 재시도를 처리하게 한다. `autoCommit: false` 또는 기본 자동 커밋 설정과 함께 at‑least‑once를 유지할 수 있도록 NestJS handler 패턴을 그대로 사용한다. + +### 제한된 동시성 구현 + +`MAX_PARALLEL_FLUSHES` 값을 통해 몇 개의 bulk 요청을 동시에 처리할지 제어한다. 예를 들어 1로 두면 항상 하나씩 flush하고, 2 이상으로 올리면 Elastic 클러스터의 thread pool과 대기열이 허용하는 범위까지 동시 요청을 늘린다. 실험을 통해 적정 값을 찾아야 한다. Elastic 커뮤니티에서는 먼저 1 MB batch / 1 thread, 다음 2 MB / 1 thread, 다음 2 MB / 2 threads 등으로 조합을 실험해 보고 throughput이 떨어지는 지점을 찾으라고 조언한다【750595237790637†L52-L61】. + +### 배치 크기 추정 + +Batch size를 문서 개수와 바이트 수 기준으로 동시에 제어하는 것이 이상적이다. 각 문서의 JSON 문자열 길이(또는 Buffer 길이)를 측정하여 누적 크기를 추적하고, 5 MB ~ 10 MB를 넘지 않도록 flush한다. Jörg Prante는 64 MB bulk가 너무 크며 GC 지연을 유발할 수 있다고 지적했다【750595237790637†L52-L61】. + +### 에러·재시도 정책 + +Elasticsearch `_bulk` API는 응답에 `items` 배열을 포함하여 각 작업의 성공/실패 여부를 알려준다. 단순한 첫 구현에서는 **하나의 문서라도 실패하면 전체 배치 실패**로 처리하여 메시지를 재수신하도록 한다. 향후에는 부분 실패 문서만 재시도하는 로직을 추가하거나 Kafka DLQ(Dead Letter Queue)를 도입할 수 있다. 클러스터가 과부하되어 `429 Too Many Requests`가 발생하는 경우에는 bulkIndexer가 **지수 백오프**를 적용하여 재시도할 수 있다【85091753486274†L118-L129】. + +### DTO 검증 최적화 + +대량 ingest 시 class‑validator 기반 DTO 검증이 병목이 될 수 있다. 필수 필드만 간단히 체크하여 검증 속도를 높이고, 복잡한 검증은 API 단이나 별도의 경로에서 수행하는 것이 좋다. 적재 모드에서는 debug 로그를 문서마다 찍지 말고 일정 건수마다 집계 로그를 찍어 I/O 오버헤드를 줄인다. + +### 멀티 파티션·병렬 컨슈머 + +Kafka topic의 파티션 수를 늘리고 동일한 consumer group으로 여러 인스턴스를 실행하면 파티션 단위로 workload가 분산된다. BulkIndexer는 프로세스 내부에서만 상태를 공유하므로 각 컨슈머 인스턴스에 하나씩 생성된다. 클러스터 전체를 고려할 때, BulkIndexer의 동시 flush 수(`MAX_PARALLEL_FLUSHES`) 곱하기 인스턴스 수가 Elasticsearch cluster의 쓰기 스레드 풀을 넘지 않도록 주의한다. 클러스터가 429 에러를 반환하면 consumer 인스턴스 수 또는 flush 동시성을 줄이고, 지수 백오프를 도입하여 쓰기 속도를 조절한다. + +## 벤치마킹과 튜닝 + +### 초기 파라미터 제안 + +| 파라미터 | 초기값 | 설명 | +|---|---|---| +| `BATCH_SIZE` | 500 ~ 1000 | 한 batch당 문서 수. 작은 값은 네트워크 오버헤드가 크고, 너무 큰 값은 ES heap/GC에 부담을 준다【750595237790637†L52-L61】. | +| `BATCH_BYTE_LIMIT` | 5 MB | 문서 크기를 합산하여 5 MB를 넘기 전에 flush. Elastic 커뮤니티에서 1 MB ~ 10 MB 사이를 권장함【750595237790637†L52-L61】. | +| `FLUSH_INTERVAL_MS` | 1000 ms | 문서가 적게 들어오는 상황에서 1 초마다 flush하여 지연을 줄인다. | +| `MAX_PARALLEL_FLUSHES` | 1 | 한 번에 하나의 bulk 요청을 수행하여 클러스터 과부하를 방지하고 관찰하기 쉽게 한다. 필요시 2 ~ 4까지 늘려 실험한다【901591924758741†L125-L143】. | + +### 튜닝 방법 + +1. **배치 크기 조정**: 모니터링 도구(Kibana, Elastic APM 등)를 통해 bulk 요청의 처리 시간(`took`), 성공률, CPU 사용량을 관찰한다. 1 MB batch / 1 thread부터 시작해서 조금씩 크기나 동시성을 늘린 뒤 throughput이 떨어지는 지점을 찾는다【750595237790637†L52-L61】. +2. **Refresh interval/replica 변경**: 대량 적재를 진행하기 직전 해당 인덱스의 `refresh_interval`을 `-1`로 변경하고, `number_of_replicas`를 0으로 변경한다. 적재가 끝나면 원래 값으로 복구하고 필요 시 force merge를 수행한다【389495691530842†L947-L1003】. +3. **동시성 조정**: bulk flush의 동시에 실행되는 개수를 늘리면 throughput이 증가할 수 있으나, `429 Too Many Requests`가 나타나거나 검색 지연이 급증하면 값을 줄여야 한다. People.ai 사례에서는 동시 프로세스를 32에서 1로 줄여 클러스터 부하가 줄었다【901591924758741†L125-L143】. 용량이 충분한 클러스터에서는 2 ~ 4까지 늘려볼 수 있다. +4. **지수 백오프**: flush가 실패하거나 `429`를 받으면 1 초, 2 초, 4 초 등 점점 길게 대기한 뒤 재시도하여 클러스터에 과도한 부하를 주지 않도록 한다【85091753486274†L118-L129】. +5. **문서 설계 최적화**: 불필요하게 큰 문서나 깊은 중첩을 피하고, 필요하지 않은 필드는 매핑에서 제외한다. Search Guard 블로그에서도 인덱싱 성능을 높이려면 불필요한 필드를 줄이고 문서 크기를 최소화해야 한다고 권고한다【71945213822368†L100-L110】. + +## 추가 고려 사항 + +### 인덱스 롤오버 및 시간 기반 설계 + +로그와 APM 데이터는 시간이 지남에 따라 방대해지므로 **데이터스트림 + ILM(인덱스 라이프사이클 관리)**를 사용하여 새로운 세그먼트를 주기적으로 롤오버하고 오래된 세그먼트를 삭제하는 것이 좋다. 롤오버 간격(예: 하루, 10GB 등)을 정의하여 각 데이터스트림이 적당한 크기를 유지하도록 한다. 이는 클러스터의 merge 부하를 줄이고 검색 성능을 안정적으로 유지한다. + +### 롤업/집계 파이프라인 분리 + +1분 단위 버킷 카운트 같은 집계 작업은 ingest 경로에서 수행하지 말고, 별도의 후처리 파이프라인(ES Transform, Aggregation API + cron job, Redis 카운터 등)으로 분리하여 색인 성능에 영향을 주지 않도록 한다. 같은 컨슈머 경로에서 집계까지 수행하면 per-message 처리 시간이 길어져 throughput을 크게 떨어뜨린다. + +### 하드웨어 및 운영 환경 + +- SSD 디스크 사용, 충분한 RAM, 그리고 Elasticsearch heap 사이즈를 전체 메모리의 50% 이하로 설정해 filesystem cache를 활용하는 것이 좋다【71945213822368†L66-L72】. 또한 swap을 비활성화하고 JVM heap 사이즈를 적절히 설정해야 한다【389495691530842†L1024-L1037】. +- Translog 크기(`flush_threshold_size`)와 인덱싱 버퍼를 조절해 flush/merge 빈도를 줄이면 성능이 향상될 수 있다【85091753486274†L241-L266】. +- Node 애플리케이션에서 CPU 집약적인 로직(class-validator, JSON 변환 등)을 줄이고, 로그 출력을 최소화해 I/O 부하를 줄인다. + +## 결론 + +Bulk API와 제한된 동시성 전략을 통해 APM 로그 파이프라인의 색인 속도를 **수십 배 이상** 향상시킬 수 있다. 핵심은 메시지를 메모리 버퍼에 모았다가 일정 크기·시간마다 `_bulk` 요청으로 묶어 보내는 **BulkIndexer**를 도입하고, 동시에 플러시되는 요청 수를 제어하여 Elasticsearch 클러스터를 과부하하지 않는 것이다. Elastic 문서는 적절한 배치 크기와 refresh interval 조정을 통해 색인 성능을 최적화할 수 있다고 강조한다【389495691530842†L947-L986】, 【102369654029175†L753-L757】. 커뮤니티 경험도 1 MB~10 MB 정도의 청크와 실험을 통한 sweet spot 찾기를 권장하며, replica를 0으로 두고 refresh를 비활성화하면 초기 적재 속도를 크게 높일 수 있다고 말한다【750595237790637†L52-L61】. 이러한 설계를 코드화하면 단건 `index()` 기반 구조를 리팩터링하여 수십만 건의 로그도 수 분 이내에 색인할 수 있을 것이다. \ No newline at end of file diff --git a/backend/rollup_metrics_spec.md b/backend/rollup_metrics_spec.md index 83ab624..f6233ff 100644 --- a/backend/rollup_metrics_spec.md +++ b/backend/rollup_metrics_spec.md @@ -6,7 +6,7 @@ 현재 APM 시스템은 다음과 같이 동작합니다: -- **데이터 수집 파이프라인**: 애플리케이션에서 생성되는 로그/스팬을 Kafka 토픽(`apm.logs`, `apm.spans`)에 발행하고, `stream‑processor` 서비스가 이를 소비하여 Elasticsearch 데이터 스트림(`logs-apm.*`, `traces-apm.*`)에 인덱싱합니다. +- **데이터 수집 파이프라인**: 애플리케이션에서 생성되는 로그/스팬을 Kafka 토픽(`apm.logs`, `apm.spans`)에 발행하고, `stream‑processor` 서비스가 이를 소비하여 Elasticsearch 데이터 스트림(`logs-apm.*`, `traces-apm.*`)에 인덱싱합니다. 이때 stream-processor는 `apm_bulk_design.md`에 정의된 BulkIndexer로 문서를 버퍼링한 뒤 `_bulk` API로 저장하며, 이 단계에서는 어떤 롤업/집계도 수행하지 않습니다. - **조회 API(Query‑API)**: NestJS 기반 서비스로, Elasticsearch에서 시간 범위에 맞는 로그/스팬을 조회하면서 집계(`percentiles`, `sum`, `filter` 등)를 수행하여 메트릭(p50, p90, p95, request_total, error_rate)을 실시간으로 계산하고 응답합니다. **문제점**: 장기 구간(예: 최근 1시간 이상)을 지속적으로 조회할 경우, 매번 원본 로그/스팬 전체를 스캔해야 하므로 Elasticsearch 및 `stream‑processor`의 부하가 증가합니다. 또한 SLA/SLO 보고서나 주간/월간 대시보드와 같이 장기 데이터를 자주 조회하는 경우 성능과 비용 문제가 발생합니다. @@ -16,6 +16,7 @@ 1. **롤업(Roll‑up) 인덱스 도입**: 로그/스팬 원본을 일정한 시간 단위(이 문서에서는 1분 버킷으로 정의)로 미리 집계하여 별도의 인덱스에 저장합니다. 이 인덱스는 p50/p90/p95, 요청 수, 오류 수 등을 포함한 요약 메트릭을 제공합니다. 2. **짧은 구간은 원본 집계 + 캐시 유지**: 최근 몇 분(예: 5분 이내)은 여전히 원본 인덱스에서 실시간 집계하여 최신 데이터를 제공합니다. 해당 부분은 Redis 캐시(10초 시간 버킷) 전략을 사용합니다. 3. **Query‑API 개선**: 요청 범위에 따라 raw 인덱스, 롤업 인덱스 또는 두 데이터를 결합하여 응답을 생성합니다. 이 과정은 완전히 자동화되어야 하며, 기존 엔드포인트와 스키마를 유지합니다. +4. **롤업/집계 파이프라인 분리**: ingest 단계(Bulk 색인)에서는 오로지 문서 저장에만 집중하고, 1분 버킷 카운트 같은 집계는 Elasticsearch Transform·Aggregation API·Cron job 등 별도 후처리 경로로 구현합니다. 컨슈머에서 집계까지 수행하면 per-message 처리 시간이 길어져 throughput이 크게 떨어지므로, 본 스펙의 모든 롤업/통계 계산은 **색인 후 비동기 파이프라인**에서 이루어져야 합니다. ## 2. 롤업 인덱스 설계 @@ -83,22 +84,19 @@ ### 3.1 Stream‑Processor 변경 -현재 `stream‑processor`는 Kafka `apm.logs`/`apm.spans` 토픽을 소비하여 raw 데이터를 ES에 인덱싱합니다. 여기에 **롤업 생산 기능**을 추가합니다. - -1. **슬라이딩 윈도우 집계** - - 프로듀서 측에서 로그/스팬을 소비할 때, `service_name`과 `environment`를 키로 하여 **1분 단위 슬라이딩 윈도우**를 관리합니다. - - RxJS 또는 Node.js의 시간 기반 버퍼링을 활용하여, 각 1분 간격이 끝날 때마다 해당 분에 속하는 이벤트들의 통계를 계산합니다. -2. **메트릭 계산** - - `duration_ms` 또는 해당 스팬/로그의 처리 시간 필드를 이용해 p50/p90/p95/p99을 계산합니다. JavaScript에서 t‑digest 알고리즘 라이브러리(예: `tdigest` npm 패키지)를 사용하거나, 간단히 배열 정렬 후 퍼센타일을 계산할 수 있습니다. - - `request_count`는 이벤트 수, `error_count`는 로그 수준(`ERROR` 이상) 또는 스팬의 `status != OK`인 건수로 정의합니다. - - `error_rate = error_count / request_count` 로 계산합니다. `request_count=0`이면 0으로 둡니다. -3. **인덱싱** - - 집계가 끝나면 위에서 정의한 필드 구조로 JSON 문서를 생성하여, 해당 서비스의 `metrics-apm.-` 데이터 스트림에 추가합니다. - - 인덱싱 시점의 타임스탬프는 버킷의 시작 시각(예: 분 경계)을 사용합니다. -4. **오류 처리** - - 롤업 인덱싱에 실패하더라도 raw 인덱싱에는 영향이 없어야 합니다. 에러 발생 시 별도의 로깅 및 재시도 로직을 넣되, 롤업 실패로 인해 전체 수집이 지연되지 않게 합니다. -5. **구성 가능성** - - 롤업 활성화 여부, 버킷 크기, 메트릭 대상(서비스/endpoint), 보존 기간 등을 환경 변수로 설정할 수 있게 만듭니다. +현재 `stream‑processor`는 Kafka `apm.logs`/`apm.spans`를 BulkIndexer를 통해 raw 데이터 스트림에 저장합니다. 롤업 생산은 ingest 경로가 아닌 **별도 후처리 파이프라인**으로 구현합니다. 접근 방식은 다음 중 하나를 택합니다. + +1. **Elasticsearch Transform**: + - `logs-apm.*`와 `traces-apm.*`를 원천 인덱스로 지정하고 1분 버킷 Transform을 생성합니다. + - Transform은 elasticsearch가 관리하므로 stream‑processor의 CPU/메모리를 소모하지 않고 롤업 인덱스(`metrics-apm.*`)를 채웁니다. + - 퍼센타일 계산은 ES `percentiles` 집계를 사용합니다. +2. **별도 Aggregator(Worker)**: + - Query-API 혹은 전용 워커 프로세스가 일정 주기(예: 30초)에 raw 인덱스를 집계하여 롤업 문서를 생성한 뒤 Bulk로 색인합니다. + - 이 워커는 stream‑processor와 별도의 애플리케이션으로 운영되어 ingest 경로와 자원을 공유하지 않습니다. +3. **Redis/메시지 기반 파이프라인**: + - 임시 저장소에 엔드포인트별 카운터를 적재 후, 일정 주기에 롤업 인덱스로 밀어 넣는 구조도 가능합니다. 핵심은 ingest 서비스와 집계 로직을 명확히 분리하는 것입니다. + +선택한 방식과 관계없이 롤업 파이프라인은 실패해도 raw 색인을 막지 않아야 하며, stream‑processor의 BulkIndexer 처리량에 영향을 주지 않도록 독립적으로 운영해야 합니다. ### 3.2 ES Transform 사용 (대체안) @@ -198,7 +196,7 @@ Transform 방식은 인프라 차원에서 설정하면 되지만, 이번 명세 ## 5. 배포 및 운영 지침 1. **롤업 인덱스 템플릿/ILM 적용**: 앞서 제시한 매핑과 보존 정책을 Elasticsearch 클러스터에 등록합니다. 수집 환경별(namespace별)로 템플릿 이름을 구분합니다. -2. **stream‑processor 업데이트 배포**: 롤업 집계 모듈을 통합한 후, Canary 배포 등으로 점진적으로 도입합니다. 배포 전에 Kafka lag과 ES 인덱싱 지연을 모니터링하여, 롤업 연산이 수집 성능에 미치는 영향을 확인합니다. +2. **stream‑processor 업데이트 배포**: 롤업 집계 모듈을 통합한 후, Canary 배포 등으로 점진적으로 도입합니다. 배포 전에 Kafka lag과 ES 인덱싱 지연을 모니터링하여, 롤업 연산이 **별도의 후처리 파이프라인에서 동작하고 ingest 경로를 방해하지 않는지** 확인합니다. 3. **모니터링**: 롤업 인덱스 생성률, 문서 수, 집계 지연 등을 대시보드로 구성하여 이상을 감지합니다. raw 인덱스와 롤업 인덱스의 데이터가 동일 기간에 대해 일치하는지도 확인합니다. 4. **백필(backfill)**: 롤업 도입 이전 기간에 대해 과거 데이터를 롤업 인덱스에 채워 넣어야 하는 경우, 별도의 배치 프로세스를 만들어 일정 구간씩 raw 데이터를 읽어 롤업 문서를 생성합니다. 백필 작업은 스로틀링(throttling)을 걸어서 클러스터 부하를 피하도록 합니다. diff --git a/backend/src/notification/notification.module.ts b/backend/src/notification/notification.module.ts deleted file mode 100644 index e2eccd3..0000000 --- a/backend/src/notification/notification.module.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Module } from "@nestjs/common"; -import { NotificationService } from "./notification.service"; - -@Module({ - providers: [NotificationService], - exports: [NotificationService], -}) -export class NotificationModule {} diff --git a/backend/src/notification/notification.service.ts b/backend/src/notification/notification.service.ts deleted file mode 100644 index 26758fb..0000000 --- a/backend/src/notification/notification.service.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { Injectable, Logger } from "@nestjs/common"; -import { Transporter } from "nodemailer"; -import * as nodemailer from "nodemailer"; - -/** - * 알람을 울릴 때 필요한건 - * 서비스명 - * 현재수치 - * 메트릭이름 - * 인증관련은 미들웨어 - */ -export interface SLOAlertData { - service: string; - metric: string; - currentValue: number; -} - -@Injectable() -export class NotificationService { - private readonly transporter: Transporter | null = null; - private readonly fromEmail: string; - private readonly defaultRecipient: string; - private readonly logger = new Logger(NotificationService.name); - - constructor() { - const gmailUser = process.env.GMAIL_USER; - const gmailAppPassword = process.env.GMAIL_APP_PASSWORD; - this.defaultRecipient = process.env.EMAIL_DEFAULT_RECIPIENT ?? ""; - this.fromEmail = gmailUser ?? "no-reply@panopticon.com"; - - if (!gmailUser || !gmailAppPassword) { - this.logger.warn( - "Gmail credentials not set (GMAIL_USER, GMAIL_APP_PASSWORD). Email notifications will be disabled.", - ); - return; - } - - this.transporter = nodemailer.createTransport({ - service: "gmail", - auth: { - user: gmailUser, - pass: gmailAppPassword, - }, - }); - - this.transporter.verify((error) => { - if (error) { - this.logger.error("Transporter verification failed:", error); - } else { - this.logger.log("✅ Transporter is ready to send emails"); - } - }); - - this.logger.log("Email provider initialized2"); - } - - /** - * SLO 위반 알림 전송 (Slack + Email) - */ - async sendSLOAlert(data: SLOAlertData): Promise { - if (!this.transporter) { - this.logger.warn("Email transporter not initialized. Email not sent."); - return false; - } - - try { - const recipients = this.defaultRecipient; - - if (recipients.length === 0) { - this.logger.warn("No email recipients specified. Email not sent."); - return false; - } - - await this.transporter.sendMail({ - from: `"Panopticon Alert" <${this.fromEmail}>`, - to: recipients, - subject: "Panopticon Alert", - text: `비~상~ ${data.service}의 ${data.metric}이 ${data.currentValue}..!`, - }); - - this.logger.log(`Email sent to ${recipients}`); - return true; - } catch (error) { - this.logger.error("Failed to send email", error); - return false; - } - } -} diff --git a/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts b/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts index 8c49ab2..2f50ff1 100644 --- a/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts +++ b/backend/src/stream-processor/apm/log-ingest/log-ingest.module.ts @@ -1,9 +1,10 @@ import { Module } from "@nestjs/common"; import { ApmInfrastructureModule } from "../../../shared/apm/apm.module"; +import { BulkIngestModule } from "../../common/bulk-ingest.module"; import { LogIngestService } from "./log-ingest.service"; @Module({ - imports: [ApmInfrastructureModule], + imports: [ApmInfrastructureModule, BulkIngestModule], providers: [LogIngestService], exports: [LogIngestService], }) diff --git a/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts b/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts index e8ff95b..7d34e8f 100644 --- a/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts +++ b/backend/src/stream-processor/apm/log-ingest/log-ingest.service.ts @@ -1,14 +1,17 @@ import { Injectable } from "@nestjs/common"; -import { ApmLogRepository } from "../../../shared/apm/logs/log.repository"; import type { LogEventDto } from "../../../shared/apm/logs/dto/log-event.dto"; import type { LogDocument } from "../../../shared/apm/logs/log.document"; +import type { LogStreamKey } from "../../../shared/logs/log-storage.service"; +import { BulkIndexerService } from "../../common/bulk-indexer.service"; /** * Kafka에서 들어온 로그 이벤트를 Elasticsearch에 저장하는 서비스 */ @Injectable() export class LogIngestService { - constructor(private readonly repository: ApmLogRepository) {} + private static readonly STREAM_KEY: LogStreamKey = "apmLogs"; + + constructor(private readonly bulkIndexer: BulkIndexerService) {} async ingest(dto: LogEventDto): Promise { const document: LogDocument = { @@ -27,7 +30,8 @@ export class LogIngestService { ingestedAt: new Date().toISOString(), }; - await this.repository.save(document); + // 로그 문서를 bulk 버퍼에 적재하고 flush가 끝날 때까지 기다린다. + await this.bulkIndexer.enqueue(LogIngestService.STREAM_KEY, document); } /** diff --git a/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts b/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts index fc51b50..a154c13 100644 --- a/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts +++ b/backend/src/stream-processor/apm/span-ingest/span-ingest.module.ts @@ -1,9 +1,10 @@ import { Module } from "@nestjs/common"; import { ApmInfrastructureModule } from "../../../shared/apm/apm.module"; +import { BulkIngestModule } from "../../common/bulk-ingest.module"; import { SpanIngestService } from "./span-ingest.service"; @Module({ - imports: [ApmInfrastructureModule], + imports: [ApmInfrastructureModule, BulkIngestModule], providers: [SpanIngestService], exports: [SpanIngestService], }) diff --git a/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts b/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts index 20443ec..9efca3c 100644 --- a/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts +++ b/backend/src/stream-processor/apm/span-ingest/span-ingest.service.ts @@ -1,14 +1,17 @@ import { Injectable } from "@nestjs/common"; -import { SpanRepository } from "../../../shared/apm/spans/span.repository"; import type { SpanEventDto } from "../../../shared/apm/spans/dto/span-event.dto"; import type { SpanDocument } from "../../../shared/apm/spans/span.document"; +import type { LogStreamKey } from "../../../shared/logs/log-storage.service"; +import { BulkIndexerService } from "../../common/bulk-indexer.service"; /** * 스팬 이벤트를 Elasticsearch에 저장하는 서비스 */ @Injectable() export class SpanIngestService { - constructor(private readonly repository: SpanRepository) {} + private static readonly STREAM_KEY: LogStreamKey = "apmSpans"; + + constructor(private readonly bulkIndexer: BulkIndexerService) {} async ingest(dto: SpanEventDto): Promise { const document: SpanDocument = { @@ -30,7 +33,8 @@ export class SpanIngestService { ingestedAt: new Date().toISOString(), }; - await this.repository.save(document); + // 스팬 문서를 BulkIndexer 버퍼에 적재하여 색인 지연 시간을 줄인다. + await this.bulkIndexer.enqueue(SpanIngestService.STREAM_KEY, document); } /** diff --git a/backend/src/stream-processor/common/bulk-indexer.service.ts b/backend/src/stream-processor/common/bulk-indexer.service.ts new file mode 100644 index 0000000..f2528ec --- /dev/null +++ b/backend/src/stream-processor/common/bulk-indexer.service.ts @@ -0,0 +1,210 @@ +import { Injectable, Logger, OnModuleDestroy } from "@nestjs/common"; +import type { BaseApmDocument } from "../../shared/apm/common/base-apm.repository"; +import { + LogStorageService, + type LogStreamKey, +} from "../../shared/logs/log-storage.service"; +import type { Client } from "@elastic/elasticsearch"; + +interface BufferedItem { + index: string; + document: BaseApmDocument; + size: number; + resolve: () => void; + reject: (error: Error) => void; +} + +/** + * Elasticsearch Bulk API를 이용해 로그/스팬을 배치 단위로 색인하는 유틸리티 + * - 버퍼에 문서를 모았다가 크기/시간 조건을 만족하면 NDJSON 형태로 전송 + * - 동시 플러시 수를 제한해 ES 클러스터 과부하를 막는다 + */ +@Injectable() +export class BulkIndexerService implements OnModuleDestroy { + private readonly logger = new Logger(BulkIndexerService.name); + private readonly client: Client; + private readonly maxBatchSize: number; + private readonly maxBatchBytes: number; + private readonly flushIntervalMs: number; + private readonly maxParallelFlushes: number; + + private buffer: BufferedItem[] = []; + private bufferedBytes = 0; + private flushTimer: NodeJS.Timeout | null = null; + private inFlightFlushes = 0; + private pendingFlush = false; + + constructor(private readonly storage: LogStorageService) { + this.client = this.storage.getClient(); + this.maxBatchSize = Math.max( + 1, + Number.parseInt(process.env.BULK_BATCH_SIZE ?? "500", 10), + ); + const byteLimitMb = Number.parseFloat( + process.env.BULK_BATCH_BYTES_MB ?? "5", + ); + this.maxBatchBytes = Math.max(1024, Math.floor(byteLimitMb * 1024 * 1024)); + this.flushIntervalMs = Math.max( + 100, + Number.parseInt(process.env.BULK_FLUSH_INTERVAL_MS ?? "1000", 10), + ); + this.maxParallelFlushes = Math.max( + 1, + Number.parseInt(process.env.BULK_MAX_PARALLEL_FLUSHES ?? "1", 10), + ); + } + + /** + * Bulk 버퍼에 문서를 추가하고 조건을 만족하면 즉시 플러시한다. + */ + enqueue(streamKey: LogStreamKey, document: BaseApmDocument): Promise { + const indexName = this.storage.getDataStream(streamKey); + const size = + Buffer.byteLength(JSON.stringify({ index: { _index: indexName } })) + + Buffer.byteLength(JSON.stringify(document)) + + 2; + + return new Promise((resolve, reject) => { + this.buffer.push({ index: indexName, document, size, resolve, reject }); + this.bufferedBytes += size; + if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else { + this.ensureFlushTimer(); + } + }); + } + + async onModuleDestroy(): Promise { + if (this.buffer.length > 0) { + await this.flushRemaining(); + } + } + + /** + * 버퍼가 비어있지 않다면 즉시 flush를 수행한다. + */ + private triggerFlush(): void { + if (this.buffer.length === 0) { + return; + } + if (this.inFlightFlushes >= this.maxParallelFlushes) { + this.pendingFlush = true; + return; + } + + const batch = this.drainBuffer(); + if (batch.length === 0) { + return; + } + + this.inFlightFlushes += 1; + void this.executeFlush(batch) + .catch((error) => { + this.logger.error( + "Bulk 색인 도중 예기치 않은 오류가 발생했습니다.", + error instanceof Error ? error.stack : String(error), + ); + }) + .finally(() => { + this.inFlightFlushes -= 1; + if (this.pendingFlush) { + this.pendingFlush = false; + this.triggerFlush(); + } else if (this.shouldFlushBySize()) { + this.triggerFlush(); + } else if (this.buffer.length > 0) { + this.ensureFlushTimer(); + } + }); + } + + private shouldFlushBySize(): boolean { + return ( + this.buffer.length >= this.maxBatchSize || + this.bufferedBytes >= this.maxBatchBytes + ); + } + + private ensureFlushTimer(): void { + if (this.flushTimer || this.buffer.length === 0) { + return; + } + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.triggerFlush(); + }, this.flushIntervalMs); + } + + private drainBuffer(): BufferedItem[] { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + const batch = this.buffer; + this.buffer = []; + this.bufferedBytes = 0; + return batch; + } + + private async executeFlush(batch: BufferedItem[]): Promise { + const operations = this.buildOperations(batch); + try { + const response = await this.client.bulk({ operations }); + if (response.errors) { + this.logBulkError(response); + const error = new Error("Bulk 색인 중 일부 문서가 실패했습니다."); + batch.forEach((item) => item.reject(error)); + return; + } + batch.forEach((item) => item.resolve()); + this.logger.debug( + `Bulk 색인 완료 batch=${batch.length} took=${response.took ?? 0}ms`, + ); + } catch (error) { + const wrapped = + error instanceof Error + ? error + : new Error(`Bulk 색인 실패: ${String(error)}`); + batch.forEach((item) => item.reject(wrapped)); + this.logger.warn( + "Bulk 색인 요청이 실패했습니다. Kafka 컨슈머가 재시도합니다.", + wrapped.stack, + ); + } + } + + private buildOperations( + batch: BufferedItem[], + ): Array> { + const operations: Array> = []; + for (const item of batch) { + // 데이터 스트림은 create op만 허용하므로 bulk 액션을 create로 지정한다. + operations.push({ create: { _index: item.index } }); + operations.push({ ...item.document }); + } + return operations; + } + + private logBulkError(response: { items?: Array> }): void { + const firstError = response.items + ?.map((item) => Object.values(item)[0]) + .find((result) => result && result.error); + if (firstError) { + this.logger.error( + `Bulk 색인 실패: type=${firstError.error?.type} reason=${firstError.error?.reason}`, + ); + } else { + this.logger.error("Bulk 색인 실패: 응답 내 오류 세부 정보 없음"); + } + } + + private async flushRemaining(): Promise { + while (this.buffer.length > 0 || this.inFlightFlushes > 0) { + if (this.buffer.length > 0) { + this.triggerFlush(); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } + } +} diff --git a/backend/src/stream-processor/common/bulk-ingest.module.ts b/backend/src/stream-processor/common/bulk-ingest.module.ts new file mode 100644 index 0000000..34a4926 --- /dev/null +++ b/backend/src/stream-processor/common/bulk-ingest.module.ts @@ -0,0 +1,14 @@ +import { Global, Module } from "@nestjs/common"; +import { ApmInfrastructureModule } from "../../shared/apm/apm.module"; +import { BulkIndexerService } from "./bulk-indexer.service"; + +/** + * Bulk 인덱싱 서비스를 전역으로 제공하는 모듈 + */ +@Global() +@Module({ + imports: [ApmInfrastructureModule], + providers: [BulkIndexerService], + exports: [BulkIndexerService], +}) +export class BulkIngestModule {} diff --git a/backend/src/stream-processor/sample-producer/send-sample-span.ts b/backend/src/stream-processor/sample-producer/send-sample-span.ts index 98d53ce..f3ba624 100644 --- a/backend/src/stream-processor/sample-producer/send-sample-span.ts +++ b/backend/src/stream-processor/sample-producer/send-sample-span.ts @@ -34,7 +34,7 @@ async function main(): Promise { parent_span_id: null, name: "GET /api/sample", kind: "SERVER", - duration_ms: 23.5, + duration_ms: 50.5, status: "OK", http_method: "GET", http_path: "/api/sample", diff --git a/k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md b/k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md deleted file mode 100644 index ba4ee0f..0000000 --- a/k8s_userside_log_generator/LOG_COLLECTOR_METRICS_SETUP.md +++ /dev/null @@ -1,97 +0,0 @@ -# Log Collector CPU 메트릭 수집 설정 - -쿠버네티스의 **log-collector Pod (3001번 포트 서버)** CPU 사용량을 수집합니다. - ---- - -## 로컬에서 테스트하는 방법 - -### 1. 사전 준비 -- Docker Desktop 실행 -- kind 클러스터 실행 중 -- Kafka/Elasticsearch/Redis 실행 중 (`cd infra && docker-compose up -d kafka elasticsearch redis`) - -### 2. FluentBit 설정 적용 - -```bash -# 1. 설정 배포 -cd k8s_userside_log_generator/k8s_http_to_flu_to_server -kubectl apply -f fluent-bit.yaml - -# 2. FluentBit 재시작 -kubectl rollout restart daemonset/fluent-bit -n default - -# 3. 재시작 완료 대기 (30초~1분) -kubectl rollout status daemonset/fluent-bit -n default -``` - -### 3. 백엔드 실행 - -```bash -cd backend -npm run start:dev:metrics-consumer -``` - -**30초마다 이런 로그가 나오면 성공:** -``` -[SYSTEM METRIC] service=log-collector pod=log-collector-xxx CPU=15.00% Memory=200Mi -``` - -### 4. 데이터 확인 - -```bash -# Query API 메트릭 엔드포인트 확인 -curl "http://localhost:3001/services/log-collector/metrics?metric=http_requests_total" -``` - ---- - -## test-log-collector-metrics.sh 파일 - -자동 테스트 스크립트입니다. 다음을 자동으로 확인합니다: -- FluentBit Pod 상태 -- log-collector Pod 상태 -- FluentBit 설정 -- Kafka 메시지 - -**사용법:** -```bash -cd k8s_userside_log_generator -./test-log-collector-metrics.sh -``` - ---- - -## 트러블슈팅 - -### "데이터가 안 들어와요" - -1. **log-collector Pod 확인** - ```bash - kubectl get pods | grep log-collector - ``` - -2. **FluentBit 재시작** - ```bash - kubectl rollout restart daemonset/fluent-bit -n default - ``` - -3. **30초 기다리기** (FluentBit이 30초마다 수집함) - -4. **Kafka 확인** - ```bash - docker exec panopticon-kafka /opt/kafka/bin/kafka-console-consumer.sh \ - --bootstrap-server localhost:9092 \ - --topic metrics.system \ - --max-messages 5 - ``` - ---- - -## 요약 - -1. `kubectl apply -f fluent-bit.yaml` - 설정 적용 -2. `kubectl rollout restart daemonset/fluent-bit` - 재시작 -3. `npm run start:dev:metrics-consumer` - 백엔드 실행 -4. 30초 기다리기 -5. Query API에서 확인 (`GET /services/{serviceName}/metrics`) diff --git a/k8s_userside_log_generator/README.md b/k8s_userside_log_generator/README.md deleted file mode 100644 index 578d508..0000000 --- a/k8s_userside_log_generator/README.md +++ /dev/null @@ -1,164 +0,0 @@ -# 테스트 소개 - -## 테스트 종류 - -## 1. k8s-http-to-flu-to-server - -해당 프로그램은 쿠버네티스 환경에서 **HTTP API 호출**을 통해 로그를 생성하고, -fluent-bit이 이를 감지하여 수집서버로 전달합니다. -**Ingress**를 통해 외부에서 API에 접근 가능하며, 실제 프로덕션 환경과 유사한 테스트입니다. - -### 사용법 - -#### 방법 1: 자동 배포 스크립트 (권장) - -```bash -cd log_collect_tests/k8s_http_to_flu_to_server -./deploy.sh -``` - -스크립트가 자동으로 다음을 수행합니다: - -- Kind 클러스터 확인 -- 기존 FluentBit 리소스 정리 (충돌 방지) -- Docker 이미지 빌드 -- Kind 클러스터에 이미지 로드 -- Kubernetes 리소스 배포 (Ingress 포함) -- 파드 준비 상태 대기 -- 사용법 안내 - -**리소스 삭제:** - -```bash -./cleanup.sh -``` - -**Kind 클러스터가 없는 경우:** - -```bash -# kind-config.yaml을 사용하여 클러스터 생성 (Ingress 설정 포함) -kind create cluster --config kind-config.yaml --name log-cluster - -# Ingress Controller 설치 (deploy.sh가 자동으로 확인 및 설치하지만, 수동 설치 시) -kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml - -# Ingress Controller 준비 확인 -kubectl wait --namespace ingress-nginx \ - --for=condition=ready pod \ - --selector=app.kubernetes.io/component=controller \ - --timeout=90s - -# control-plane에 설치되었는지 확인 -kubectl get pods -n ingress-nginx -o wide -``` - -#### 방법 2: 수동 배포 - -1. **Docker 이미지 빌드** - - ```bash - # 수집서버 이미지 빌드 - cd log_collect_tests/log_collect_server - docker build -t log-collector:latest . - - # 생성서버 이미지 빌드 - cd ../log_generator_server - docker build -t log-generator:latest . - ``` - -2. **Kind 클러스터에 이미지 로드** - - ```bash - kind load docker-image log-collector:latest --name log-cluster - kind load docker-image log-generator:latest --name log-cluster - ``` - -3. **Kubernetes 리소스 배포** - - ```bash - cd ../k8s_http_to_flu_to_server - - kubectl apply -f log-generator-deployment.yaml - kubectl apply -f log-collect-deployment.yaml - kubectl apply -f fluent-bit.yaml - kubectl apply -f ingress.yaml - ``` - -4. **배포 확인** - - - `kubectl get pods` - 모든 파드가 Running 상태인지 확인 - - `kubectl get services` - 서비스가 정상 생성되었는지 확인 - - `kubectl get ingress` - Ingress가 localhost로 매핑되었는지 확인 - -5. **테스트** - - ```bash - # API 호출 테스트 (단건) - curl http://localhost:8080/api/users/3 - - # 자동 로그 생성 (10회) - curl http://localhost:8080/api/autolog - ``` - -6. **로그 확인** - - ```bash - # 수집서버 로그 확인 (FluentBit이 전달한 로그 확인) - kubectl logs -l app=log-collector -f - - # 생성서버 로그 확인 (원본 로그 확인) - kubectl logs -l app=log-generator -f - ``` - -7. **정상 작동 확인** - - `curl http://localhost:8080/api/autolog` 호출 시 - - 생성서버에서 10번의 로그 출력 확인 - - 수집서버에서 FluentBit을 통해 전달된 로그 수신 확인 - -### 환경설정 - -- dockerDesktop → 환경설정 → kubernetes on -- **Kind 클러스터 요구사항:** - - `kind-config.yaml`에 Ingress 설정 포함: - ```yaml - nodes: - - role: control-plane - kubeadmConfigPatches: - - | - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - node-labels: "ingress-ready=true" - extraPortMappings: - - containerPort: 80 - hostPort: 8080 # localhost:8080으로 접근 - protocol: TCP - ``` - - Nginx Ingress Controller 설치 (deploy.sh가 자동 설치) - -### 유용한 디버깅 명령어 - -```bash -# 전체 리소스 확인 -kubectl get all - -# 파드 상태 상세 확인 -kubectl describe pod - -# 파드 로그 확인 (최근 100줄) -kubectl logs --tail=100 - -# 파드 실시간 로그 -kubectl logs -f - -# 레이블별 로그 확인 -kubectl logs -l app=log-generator -f - -# 이벤트 확인 (문제 진단) -kubectl get events --sort-by='.lastTimestamp' - -# 리소스 사용량 -kubectl top pods -``` - ---- diff --git a/k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh b/k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh deleted file mode 100755 index 7adec2a..0000000 --- a/k8s_userside_log_generator/k8s_http_to_flu_to_server/cleanup.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -set -e - -echo "🧹 k8s-http-to-flu-to-server 리소스 삭제 시작" -echo "" - -# 현재 스크립트 위치 기준으로 경로 설정 -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -cd "$SCRIPT_DIR" - -echo "🗑️ Kubernetes 리소스 삭제 중..." - -kubectl delete -f ingress.yaml --ignore-not-found=true -kubectl delete -f fluent-bit.yaml --ignore-not-found=true -kubectl delete -f log-collect-deployment.yaml --ignore-not-found=true -kubectl delete -f log-generator-deployment.yaml --ignore-not-found=true - -echo "" -echo "✅ 모든 리소스 삭제 완료!" -echo "" - -# 남은 파드 확인 -REMAINING_PODS=$(kubectl get pods --no-headers 2>/dev/null | wc -l | tr -d ' ') - -if [ "$REMAINING_PODS" -gt 0 ]; then - echo "📊 남은 파드:" - kubectl get pods -else - echo "✅ 모든 파드가 삭제되었습니다." -fi - -echo "" diff --git a/k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh b/k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh deleted file mode 100755 index 66d6e65..0000000 --- a/k8s_userside_log_generator/k8s_http_to_flu_to_server/deploy.sh +++ /dev/null @@ -1,147 +0,0 @@ -#!/bin/bash -set -e - -echo "🚀 k8s-http-to-flu-to-server 배포 시작" -echo "📍 대상: Kind 클러스터 (log-cluster)" -echo "" - -# 현재 스크립트 위치 기준으로 경로 설정 -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" - -echo "📂 프로젝트 경로: $PROJECT_ROOT" -echo "" - -# Kind 클러스터 확인 -if ! kind get clusters | grep -q "log-cluster"; then - echo "❌ 'log-cluster' Kind 클러스터를 찾을 수 없습니다." - echo "" - echo "다음 명령어로 클러스터를 생성하세요:" - echo " kind create cluster --config $SCRIPT_DIR/kind-config.yaml --name log-cluster" - echo "" - exit 1 -fi - -echo "✅ Kind 클러스터 확인 완료" -echo "" - -# Kind 클러스터 context 설정 -kubectl config use-context kind-log-cluster > /dev/null 2>&1 - -# Ingress Controller 확인 및 설치 -echo "🔍 Ingress Controller 확인 중..." -if ! kubectl get pods -n ingress-nginx | grep -q "ingress-nginx-controller"; then - echo "📦 Ingress Controller 설치 중..." - kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml - - echo "⏳ Ingress Controller가 준비될 때까지 대기 중..." - kubectl wait --namespace ingress-nginx \ - --for=condition=ready pod \ - --selector=app.kubernetes.io/component=controller \ - --timeout=90s - - echo "✅ Ingress Controller 설치 완료" -else - echo "✅ Ingress Controller 이미 설치됨" - - # control-plane에서 실행 중인지 확인 - INGRESS_NODE=$(kubectl get pods -n ingress-nginx -o wide | grep ingress-nginx-controller | awk '{print $7}') - if [[ "$INGRESS_NODE" == *"control-plane"* ]]; then - echo " └─ control-plane에서 실행 중 ✓" - else - echo " └─ ⚠️ $INGRESS_NODE 에서 실행 중 (control-plane이 아님)" - fi -fi -echo "" - -# 1. 기존 FluentBit 삭제 (다른 테스트의 FluentBit 설정과 충돌 방지) -echo "🧹 기존 FluentBit 리소스 정리..." -kubectl delete daemonset fluent-bit --ignore-not-found=true -kubectl delete configmap fluent-bit-config --ignore-not-found=true -kubectl delete serviceaccount fluent-bit --ignore-not-found=true -kubectl delete clusterrole fluent-bit --ignore-not-found=true -kubectl delete clusterrolebinding fluent-bit --ignore-not-found=true -echo "✅ 기존 FluentBit 정리 완료" -echo "" - -# 2. Docker 이미지 빌드 -echo "🔨 Docker 이미지 빌드 중..." - -echo " - log-collector 이미지 빌드..." -cd "$PROJECT_ROOT/k8s_userside_log_generator/log_collect_server" -docker build -t log-collector:latest . -q - -echo " - log-generator 이미지 빌드..." -cd "$PROJECT_ROOT/k8s_userside_log_generator/log_generator_server" -docker build -t log-generator:latest . -q - -echo "✅ Docker 이미지 빌드 완료" -echo "" - -# 3. Kind 클러스터에 이미지 로드 -echo "📦 Kind 클러스터에 이미지 로드 중..." -kind load docker-image log-collector:latest --name log-cluster -kind load docker-image log-generator:latest --name log-cluster -echo "✅ 이미지 로드 완료" -echo "" - -# 4. Kubernetes 리소스 배포 -echo "☸️ Kubernetes 리소스 배포 중..." -cd "$SCRIPT_DIR" - -kubectl apply -f log-generator-deployment.yaml -kubectl apply -f log-collect-deployment.yaml -kubectl apply -f fluent-bit.yaml -kubectl apply -f ingress.yaml - -echo "✅ Kubernetes 리소스 배포 완료" -echo "" - -# 5. 배포 상태 확인 -echo "⏳ 파드가 준비될 때까지 대기 중..." -sleep 3 - -kubectl wait --for=condition=ready pod -l app=log-collector --timeout=60s -kubectl wait --for=condition=ready pod -l app=log-generator --timeout=60s -kubectl wait --for=condition=ready pod -l app=fluent-bit --timeout=60s - -echo "" -echo "✅ 모든 파드 준비 완료!" -echo "" - -# 6. 배포 확인 -echo "📊 배포 상태 확인:" -echo "" -echo "Pods:" -kubectl get pods -echo "" -echo "Services:" -kubectl get services -echo "" -echo "Ingress:" -kubectl get ingress -echo "" - -# 7. 사용법 안내 -echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -echo "✅ k8s-http-to-flu-to-server 배포 완료!" -echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -echo "" -echo "📝 테스트 방법:" -echo "" -echo "1. API 호출 테스트 (단건):" -echo " curl http://localhost:8080/api/users/3" -echo "" -echo "2. 자동 로그 생성 (10회):" -echo " curl http://localhost:8080/api/autolog" -echo "" -echo "3. 수집서버 로그 확인 (FluentBit이 전달한 로그):" -echo " kubectl logs -l app=log-collector -f" -echo "" -echo "4. 생성서버 로그 확인 (원본 로그):" -echo " kubectl logs -l app=log-generator -f" -echo "" -echo "5. FluentBit 로그 확인:" -echo " kubectl logs -l app=fluent-bit -f" -echo "" -echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" diff --git a/k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml b/k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml deleted file mode 100644 index 5b36e2d..0000000 --- a/k8s_userside_log_generator/k8s_http_to_flu_to_server/fluent-bit.yaml +++ /dev/null @@ -1,229 +0,0 @@ -apiVersion: v1 -kind: ServiceAccount -metadata: - name: fluent-bit ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: fluent-bit -rules: - - apiGroups: [""] - resources: - - namespaces - - pods - verbs: ["get", "list", "watch"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: fluent-bit -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: fluent-bit -subjects: - - kind: ServiceAccount - name: fluent-bit - namespace: default ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: fluent-bit-config -data: - fluent-bit.conf: | - [SERVICE] - Flush 5 - Daemon Off - Log_Level info - Parsers_File parsers.conf - - [INPUT] - Name tail - Path /var/log/containers/*ingress-nginx-controller*.log - Tag kube.ingress.* - Refresh_Interval 5 - Mem_Buf_Limit 5MB - Skip_Long_Lines Off - - [INPUT] - Name tail - Path /var/log/containers/*log-generator*.log - Tag kube.backend.* - Refresh_Interval 5 - Mem_Buf_Limit 5MB - Skip_Long_Lines Off - - # === Ingress 로그 파싱 === - - # 1단계: Docker 컨테이너 로그 형식 파싱 - [FILTER] - Name parser - Match kube.ingress.* - Key_Name log - Parser cri_log - Reserve_Data On - - # 2단계: Nginx access log 파싱 - [FILTER] - Name parser - Match kube.ingress.* - Key_Name message - Parser nginx_ingress - Reserve_Data On - - # 3단계: 필드 추가 - [FILTER] - Name modify - Match kube.ingress.* - Add log_type http - - # === Backend 로그 파싱 === - - # 1단계: Docker 컨테이너 로그 형식 파싱 - [FILTER] - Name parser - Match kube.backend.* - Key_Name log - Parser cri_log - Reserve_Data On - - # 2단계: JSON 내용 파싱 - [FILTER] - Name parser - Match kube.backend.* - Key_Name message - Parser json_log - Reserve_Data On - - # 3단계: log_type이 없는 로그에 application 추가 (필드가 없을 때만 추가됨) - [FILTER] - Name modify - Match kube.backend.* - Add log_type application - - # 4단계: log_type에 따라 태그 분리 - [FILTER] - Name rewrite_tag - Match kube.backend.* - Rule $log_type ^metrics$ kube.metrics false - Rule $log_type ^application$ kube.app false - Emitter_Name re_emitted - - # 4단계: log-collector 필드 추가 - [FILTER] - Name modify - Match metrics.system.log-collector - Add service log-collector - Add namespace default - Add nodeName kind-control-plane - - # === 공통 정리 === - - # 불필요한 필드 제거 - [FILTER] - Name record_modifier - Match * - # Remove_key log - # Remove_key message - # Remove_key stream - Remove_key logtag - # Remove_key time - Remove_key _p - - # === Kafka 출력 === - - [OUTPUT] - Name kafka - Match kube.ingress.* - Brokers 172.21.100.253:9092 - Topics logs.http - Format json - - [OUTPUT] - Name kafka - Match kube.app - Brokers 172.21.100.253:9092 - Topics logs.app - Format json - - [OUTPUT] - Name kafka - Match kube.metrics - Brokers 172.21.100.253:9092 - Topics logs.metric - Format json - - parsers.conf: | - # CRI/Kubernetes 컨테이너 로그 형식 파서 - # 형식: - # 예: 2025-11-03T17:19:58.353137053Z stdout F actual log content here - [PARSER] - Name cri_log - Format regex - Regex ^(?