Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: CD - Build & Deploy
on:
push:
branches:
- feature/aggregator
- feature/read-pre-aggregate
# - main
workflow_dispatch:

Expand Down
24 changes: 24 additions & 0 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ backend/src
| `panopticon-query-api` | `query-api` | 브라우저 요청을 받아 OpenSearch를 조회하는 읽기 전용 API |
| `panopticon-stream-processor` | `stream-processor` | MSK(Kafka) 스트림을 소비해 로그/스팬을 정제 후 OpenSearch에 적재 |
| `panopticon-error-stream` | `error-stream` | `apm.logs.error` 토픽을 구독해 WebSocket 으로 프런트엔드(NEXT.js)에 실시간 전송 |
| `panopticon-aggregator` | `aggregator` | `metrics-apm` 롤업 인덱스를 채우는 1분 버킷 집계 전용 워커 |

### Build & Push

Expand All @@ -28,6 +29,9 @@ docker build -f backend/Dockerfile -t panopticon-stream-processor --target strea
# Error Stream (Kafka → WebSocket)
docker build -f backend/Dockerfile -t panopticon-error-stream --target error-stream backend

# Aggregator (Roll-up 워커)
docker build -f backend/Dockerfile -t panopticon-aggregator --target aggregator backend

# (선택) ECR 로그인 및 푸시
aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <account>.dkr.ecr.<region>.amazonaws.com
docker tag panopticon-query-api:latest <account>.dkr.ecr.<region>.amazonaws.com/panopticon-query-api:latest
Expand All @@ -42,9 +46,11 @@ ECS 태스크 정의에서는 각 이미지를 별도 컨테이너로 등록하

- `npm run build:query-api` / `npm run build:stream-processor`: 각 서버만 컴파일
- `npm run build:error-stream`: WebSocket 기반 에러 스트림 서버 컴파일
- `npm run build:aggregator`: 롤업 워커(1분 집계)만 컴파일
- `npm run start:prod`: `dist/query-api/query-api/main.js` 실행 (읽기 API)
- `npm run start:stream-processor:prod`: `dist/stream-processor/stream-processor/main.js` 실행 (Kafka 컨슈머)
- `npm run start:error-stream:prod`: `dist/error-stream/main.js` 실행 (Kafka→WebSocket 브리지)
- `npm run start:aggregator:prod`: `dist/aggregator/main.js` 실행 (roll-up 워커)

### Error Stream 환경 변수

Expand Down Expand Up @@ -78,6 +84,24 @@ Kafka 컨슈머 처리량을 주기적으로 파악하고 싶다면 다음 환
| `BULK_FLUSH_INTERVAL_MS` | `1000` | 위 조건을 만족하지 않아도 해당 시간이 지나면 주기적으로 flush 합니다. |
| `BULK_MAX_PARALLEL_FLUSHES` | `1` | 동시에 실행할 bulk 요청 개수. 클러스터 부하에 맞게 1~4 사이에서 조정하세요. |

### Aggregator & Query API 롤업 설정

`rollup_metrics_spec.md`에 정의된 대로 1분 버킷 롤업을 도입했습니다. `panopticon-aggregator` 컨테이너가 `metrics-apm` 데이터 스트림을 채우고, Query API는 긴 구간(기본 5분 이상)을 조회할 때 자동으로 롤업 데이터를 읽어 raw 집계와 결합합니다.

- Aggregator 환경 변수는 `backend/src/aggregator/README.md`에 정리되어 있습니다. 필요한 최소 값은 `ELASTICSEARCH_*` 연결 정보와 `ROLLUP_AGGREGATOR_ENABLED` 정도이며, 나머지는 기본값(1분 버킷, 15초 폴링 등)을 따릅니다.
- Query API는 다음 변수를 통해 롤업 전략을 제어합니다.

| 변수 | 기본값 | 설명 |
| --- | --- | --- |
| `ROLLUP_ENABLED` | `true` | `false`이면 항상 raw 집계만 사용합니다. |
| `ROLLUP_THRESHOLD_MINUTES` | `5` | 조회 구간 길이가 이 값 이상이면 `to - threshold` 이전 범위를 롤업 데이터로 채웁니다. |
| `ROLLUP_BUCKET_MINUTES` | `1` | 롤업 데이터가 사용하는 버킷 크기. 분 단위로 정렬/정규화할 때 사용합니다. |
| `ROLLUP_CACHE_TTL_SECONDS` | `60` | Redis에 저장되는 롤업 결과 TTL. 큰 구간 조회 시 반복 요청을 가볍게 합니다. |
| `ROLLUP_CACHE_PREFIX` | `apm:metrics-rollup` | 롤업 결과 캐시 키 접두사. raw 캐시(`METRICS_CACHE_PREFIX`)와 분리합니다. |
| `ROLLUP_MAX_QUERY_BUCKETS` | `43200` | 한 번의 롤업 조회에서 허용할 최대 버킷 수(기본 30일=43,200분). 과도한 범위를 방지합니다. |

> ⚠️ 롤업 데이터는 1분 버킷 기준으로 정렬되므로 from/to가 분 단위에 맞지 않아도 자동으로 버킷 경계에 맞춰 조회합니다. 최대 1분 이내의 오차가 있을 수 있다는 점을 염두에 두고 UX를 설계하세요.

### Query API 성능 프로파일링

서비스 메트릭 엔드포인트(`GET /services/{serviceName}/metrics`)가 Elasticsearch 집계를 수행하는데 걸린 시간을 확인하려면 `SERVICE_METRICS_PROFILE=true`를 설정하면 됩니다.
Expand Down
21 changes: 21 additions & 0 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"aws-msk-iam-sasl-signer-js": "^1.0.1",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.2",
"date-fns-tz": "^3.2.0",
"dotenv": "^17.2.3",
"ioredis": "^5.8.2",
"kafkajs": "^2.2.4",
Expand Down
7 changes: 7 additions & 0 deletions backend/src/aggregator/rollup-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ import { Injectable } from "@nestjs/common";
*/
@Injectable()
export class RollupConfigService {
// Aggregator 전체 on/off 스위치 (기본 true)
private readonly enabled =
(process.env.ROLLUP_AGGREGATOR_ENABLED ?? "true").toLowerCase() === "true";

// 롤업 버킷 길이(초). 60초 = 1분 버킷
private readonly bucketSeconds = this.parseNumber(
process.env.ROLLUP_BUCKET_SECONDS,
60,
);

// 닫힌 분을 확인하는 주기(ms). 짧을수록 최신 분을 빨리 처리한다.
private readonly pollIntervalMs = this.parseNumber(
process.env.ROLLUP_POLL_INTERVAL_MS,
15_000,
);

// 체크포인트가 없을 때 과거 몇 분까지 되돌아갈지 결정
private readonly initialLookbackMinutes = this.parseNumber(
process.env.ROLLUP_INITIAL_LOOKBACK_MINUTES,
5,
);

// 단일 분에 등장하는 서비스 수 상한(terms size). 너무 많으면 캐시/샤드 압박이 있다.
private readonly maxServiceBuckets = this.parseNumber(
process.env.ROLLUP_MAX_SERVICE_BUCKETS,
200,
Expand All @@ -34,9 +39,11 @@ export class RollupConfigService {
10,
);

// lastRolledUpAt 을 저장하는 전용 인덱스 이름
private readonly checkpointIndex =
process.env.ROLLUP_CHECKPOINT_INDEX ?? ".metrics-rollup-state";

// Aggregator가 데이터를 쓰는 롤업 데이터 스트림 접두사
private readonly rollupIndexPrefix =
process.env.ROLLUP_INDEX_PREFIX ?? "metrics-apm";

Expand Down
2 changes: 1 addition & 1 deletion backend/src/aggregator/rollup-metrics.repository.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Injectable, Logger } from "@nestjs/common";
import type { Client } from "@elastic/elasticsearch";
import { LogStorageService } from "../shared/logs/log-storage.service";
import type { RollupMetricDocument } from "./types/rollup-metric-document";
import type { RollupMetricDocument } from "../shared/apm/rollup/rollup-metric.document";

/**
* 롤업 결과를 metrics data stream 에 저장하는 책임을 가지는 레포지토리
Expand Down
8 changes: 5 additions & 3 deletions backend/src/aggregator/span-minute-aggregation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Injectable, Logger } from "@nestjs/common";
import type { Client } from "@elastic/elasticsearch";
import { LogStorageService } from "../shared/logs/log-storage.service";
import type { MinuteWindow } from "./types/minute-window.type";
import type { RollupMetricDocument } from "./types/rollup-metric-document";
import type { RollupMetricDocument } from "../shared/apm/rollup/rollup-metric.document";
import { RollupConfigService } from "./rollup-config.service";

const UNKNOWN_SERVICE = "unknown-service";
Expand Down Expand Up @@ -103,11 +103,13 @@ export class SpanMinuteAggregationService {
[];

const bucketDocs: RollupMetricDocument[] = [];
let totalSpanCount = 0;
const ingestedAt = new Date().toISOString();
const bucketDurationSeconds = this.config.getBucketDurationSeconds();

for (const serviceBucket of services) {
const serviceName = serviceBucket.key || UNKNOWN_SERVICE;
totalSpanCount += serviceBucket.doc_count ?? 0;
for (const envBucket of serviceBucket.environments.buckets) {
const environment = envBucket.key || UNKNOWN_ENVIRONMENT;
const total = envBucket.doc_count ?? 0;
Expand Down Expand Up @@ -147,11 +149,11 @@ export class SpanMinuteAggregationService {

if (bucketDocs.length === 0) {
this.logger.log(
`집계 대상 스팬이 없어 비어 있는 분을 건너뜁니다. window=${window.start.toISOString()}~${window.end.toISOString()} es_took=${took}ms`,
`집계 대상 스팬이 없어 비어 있는 분을 건너뜁니다. window=${window.start.toISOString()}~${window.end.toISOString()} spans=0 es_took=${took}ms`,
);
} else {
this.logger.log(
`스팬 집계 완료 window=${window.start.toISOString()}~${window.end.toISOString()} services=${services.length} docs=${bucketDocs.length} es_took=${took}ms`,
`스팬 집계 완료 window=${window.start.toISOString()}~${window.end.toISOString()} services=${services.length} docs=${bucketDocs.length} spans=${totalSpanCount} es_took=${took}ms`,
);
}

Expand Down
2 changes: 2 additions & 0 deletions backend/src/aggregator/window-planner.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export class MinuteWindowPlanner {
if (Number.isNaN(timestamp) || timestamp <= 0) {
return null;
}
// 현재 시각을 버킷 크기로 나눈 뒤 내림하여 "완전히 종료된 분"까지만 바라본다.
return timestamp - (timestamp % bucketMs);
}

Expand All @@ -61,6 +62,7 @@ export class MinuteWindowPlanner {
if (remainder === 0) {
return timestamp;
}
// 특정 시각을 가장 가까운 버킷 시작점으로 내림 정렬한다.
return timestamp - remainder;
}
}
Loading