diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 30c880b..9ba286d 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -3,7 +3,7 @@ name: CD - Build & Deploy on: push: branches: - - feature/aggregator + - feature/read-pre-aggregate # - main workflow_dispatch: diff --git a/backend/README.md b/backend/README.md index f8b3a1a..3861e47 100644 --- a/backend/README.md +++ b/backend/README.md @@ -15,6 +15,7 @@ backend/src | `panopticon-query-api` | `query-api` | 브라우저 요청을 받아 OpenSearch를 조회하는 읽기 전용 API | | `panopticon-stream-processor` | `stream-processor` | MSK(Kafka) 스트림을 소비해 로그/스팬을 정제 후 OpenSearch에 적재 | | `panopticon-error-stream` | `error-stream` | `apm.logs.error` 토픽을 구독해 WebSocket 으로 프런트엔드(NEXT.js)에 실시간 전송 | +| `panopticon-aggregator` | `aggregator` | `metrics-apm` 롤업 인덱스를 채우는 1분 버킷 집계 전용 워커 | ### Build & Push @@ -28,6 +29,9 @@ docker build -f backend/Dockerfile -t panopticon-stream-processor --target strea # Error Stream (Kafka → WebSocket) docker build -f backend/Dockerfile -t panopticon-error-stream --target error-stream backend +# Aggregator (Roll-up 워커) +docker build -f backend/Dockerfile -t panopticon-aggregator --target aggregator backend + # (선택) ECR 로그인 및 푸시 aws ecr get-login-password --region | docker login --username AWS --password-stdin .dkr.ecr..amazonaws.com docker tag panopticon-query-api:latest .dkr.ecr..amazonaws.com/panopticon-query-api:latest @@ -42,9 +46,11 @@ ECS 태스크 정의에서는 각 이미지를 별도 컨테이너로 등록하 - `npm run build:query-api` / `npm run build:stream-processor`: 각 서버만 컴파일 - `npm run build:error-stream`: WebSocket 기반 에러 스트림 서버 컴파일 +- `npm run build:aggregator`: 롤업 워커(1분 집계)만 컴파일 - `npm run start:prod`: `dist/query-api/query-api/main.js` 실행 (읽기 API) - `npm run start:stream-processor:prod`: `dist/stream-processor/stream-processor/main.js` 실행 (Kafka 컨슈머) - `npm run start:error-stream:prod`: `dist/error-stream/main.js` 실행 (Kafka→WebSocket 브리지) +- `npm run start:aggregator:prod`: `dist/aggregator/main.js` 실행 (roll-up 워커) ### Error Stream 환경 변수 @@ -78,6 +84,24 @@ Kafka 컨슈머 처리량을 주기적으로 파악하고 싶다면 다음 환 | `BULK_FLUSH_INTERVAL_MS` | `1000` | 위 조건을 만족하지 않아도 해당 시간이 지나면 주기적으로 flush 합니다. | | `BULK_MAX_PARALLEL_FLUSHES` | `1` | 동시에 실행할 bulk 요청 개수. 클러스터 부하에 맞게 1~4 사이에서 조정하세요. | +### Aggregator & Query API 롤업 설정 + +`rollup_metrics_spec.md`에 정의된 대로 1분 버킷 롤업을 도입했습니다. `panopticon-aggregator` 컨테이너가 `metrics-apm` 데이터 스트림을 채우고, Query API는 긴 구간(기본 5분 이상)을 조회할 때 자동으로 롤업 데이터를 읽어 raw 집계와 결합합니다. + +- Aggregator 환경 변수는 `backend/src/aggregator/README.md`에 정리되어 있습니다. 필요한 최소 값은 `ELASTICSEARCH_*` 연결 정보와 `ROLLUP_AGGREGATOR_ENABLED` 정도이며, 나머지는 기본값(1분 버킷, 15초 폴링 등)을 따릅니다. +- Query API는 다음 변수를 통해 롤업 전략을 제어합니다. + +| 변수 | 기본값 | 설명 | +| --- | --- | --- | +| `ROLLUP_ENABLED` | `true` | `false`이면 항상 raw 집계만 사용합니다. | +| `ROLLUP_THRESHOLD_MINUTES` | `5` | 조회 구간 길이가 이 값 이상이면 `to - threshold` 이전 범위를 롤업 데이터로 채웁니다. | +| `ROLLUP_BUCKET_MINUTES` | `1` | 롤업 데이터가 사용하는 버킷 크기. 분 단위로 정렬/정규화할 때 사용합니다. | +| `ROLLUP_CACHE_TTL_SECONDS` | `60` | Redis에 저장되는 롤업 결과 TTL. 큰 구간 조회 시 반복 요청을 가볍게 합니다. | +| `ROLLUP_CACHE_PREFIX` | `apm:metrics-rollup` | 롤업 결과 캐시 키 접두사. raw 캐시(`METRICS_CACHE_PREFIX`)와 분리합니다. | +| `ROLLUP_MAX_QUERY_BUCKETS` | `43200` | 한 번의 롤업 조회에서 허용할 최대 버킷 수(기본 30일=43,200분). 과도한 범위를 방지합니다. | + +> ⚠️ 롤업 데이터는 1분 버킷 기준으로 정렬되므로 from/to가 분 단위에 맞지 않아도 자동으로 버킷 경계에 맞춰 조회합니다. 최대 1분 이내의 오차가 있을 수 있다는 점을 염두에 두고 UX를 설계하세요. + ### Query API 성능 프로파일링 서비스 메트릭 엔드포인트(`GET /services/{serviceName}/metrics`)가 Elasticsearch 집계를 수행하는데 걸린 시간을 확인하려면 `SERVICE_METRICS_PROFILE=true`를 설정하면 됩니다. diff --git a/backend/package-lock.json b/backend/package-lock.json index 383e889..361b48c 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -22,6 +22,7 @@ "aws-msk-iam-sasl-signer-js": "^1.0.1", "class-transformer": "^0.5.1", "class-validator": "^0.14.2", + "date-fns-tz": "^3.2.0", "dotenv": "^17.2.3", "ioredis": "^5.8.2", "kafkajs": "^2.2.4", @@ -7951,6 +7952,26 @@ "node": ">= 8" } }, + "node_modules/date-fns": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/date-fns/-/date-fns-4.1.0.tgz", + "integrity": "sha512-Ukq0owbQXxa/U3EGtsdVBkR1w7KOQ5gIBqdH2hkvknzZPYvBxb/aa6E8L7tmjFtkwZBu3UXBbjIgPo/Ez4xaNg==", + "license": "MIT", + "peer": true, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/kossnocorp" + } + }, + "node_modules/date-fns-tz": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/date-fns-tz/-/date-fns-tz-3.2.0.tgz", + "integrity": "sha512-sg8HqoTEulcbbbVXeg84u5UnlsQa8GS5QXMqjjYIhS4abEVVKIUwe0/l/UhrZdKaL/W5eWZNlbTeEIiOXTcsBQ==", + "license": "MIT", + "peerDependencies": { + "date-fns": "^3.0.0 || ^4.0.0" + } + }, "node_modules/debug": { "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", diff --git a/backend/package.json b/backend/package.json index df3463a..fabda15 100644 --- a/backend/package.json +++ b/backend/package.json @@ -48,6 +48,7 @@ "aws-msk-iam-sasl-signer-js": "^1.0.1", "class-transformer": "^0.5.1", "class-validator": "^0.14.2", + "date-fns-tz": "^3.2.0", "dotenv": "^17.2.3", "ioredis": "^5.8.2", "kafkajs": "^2.2.4", diff --git a/backend/src/aggregator/rollup-config.service.ts b/backend/src/aggregator/rollup-config.service.ts index 3561613..80444fd 100644 --- a/backend/src/aggregator/rollup-config.service.ts +++ b/backend/src/aggregator/rollup-config.service.ts @@ -6,24 +6,29 @@ import { Injectable } from "@nestjs/common"; */ @Injectable() export class RollupConfigService { + // Aggregator 전체 on/off 스위치 (기본 true) private readonly enabled = (process.env.ROLLUP_AGGREGATOR_ENABLED ?? "true").toLowerCase() === "true"; + // 롤업 버킷 길이(초). 60초 = 1분 버킷 private readonly bucketSeconds = this.parseNumber( process.env.ROLLUP_BUCKET_SECONDS, 60, ); + // 닫힌 분을 확인하는 주기(ms). 짧을수록 최신 분을 빨리 처리한다. private readonly pollIntervalMs = this.parseNumber( process.env.ROLLUP_POLL_INTERVAL_MS, 15_000, ); + // 체크포인트가 없을 때 과거 몇 분까지 되돌아갈지 결정 private readonly initialLookbackMinutes = this.parseNumber( process.env.ROLLUP_INITIAL_LOOKBACK_MINUTES, 5, ); + // 단일 분에 등장하는 서비스 수 상한(terms size). 너무 많으면 캐시/샤드 압박이 있다. private readonly maxServiceBuckets = this.parseNumber( process.env.ROLLUP_MAX_SERVICE_BUCKETS, 200, @@ -34,9 +39,11 @@ export class RollupConfigService { 10, ); + // lastRolledUpAt 을 저장하는 전용 인덱스 이름 private readonly checkpointIndex = process.env.ROLLUP_CHECKPOINT_INDEX ?? ".metrics-rollup-state"; + // Aggregator가 데이터를 쓰는 롤업 데이터 스트림 접두사 private readonly rollupIndexPrefix = process.env.ROLLUP_INDEX_PREFIX ?? "metrics-apm"; diff --git a/backend/src/aggregator/rollup-metrics.repository.ts b/backend/src/aggregator/rollup-metrics.repository.ts index 1d636a2..26c6919 100644 --- a/backend/src/aggregator/rollup-metrics.repository.ts +++ b/backend/src/aggregator/rollup-metrics.repository.ts @@ -1,7 +1,7 @@ import { Injectable, Logger } from "@nestjs/common"; import type { Client } from "@elastic/elasticsearch"; import { LogStorageService } from "../shared/logs/log-storage.service"; -import type { RollupMetricDocument } from "./types/rollup-metric-document"; +import type { RollupMetricDocument } from "../shared/apm/rollup/rollup-metric.document"; /** * 롤업 결과를 metrics data stream 에 저장하는 책임을 가지는 레포지토리 diff --git a/backend/src/aggregator/span-minute-aggregation.service.ts b/backend/src/aggregator/span-minute-aggregation.service.ts index 9ceebf3..a598bbd 100644 --- a/backend/src/aggregator/span-minute-aggregation.service.ts +++ b/backend/src/aggregator/span-minute-aggregation.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger } from "@nestjs/common"; import type { Client } from "@elastic/elasticsearch"; import { LogStorageService } from "../shared/logs/log-storage.service"; import type { MinuteWindow } from "./types/minute-window.type"; -import type { RollupMetricDocument } from "./types/rollup-metric-document"; +import type { RollupMetricDocument } from "../shared/apm/rollup/rollup-metric.document"; import { RollupConfigService } from "./rollup-config.service"; const UNKNOWN_SERVICE = "unknown-service"; @@ -103,11 +103,13 @@ export class SpanMinuteAggregationService { []; const bucketDocs: RollupMetricDocument[] = []; + let totalSpanCount = 0; const ingestedAt = new Date().toISOString(); const bucketDurationSeconds = this.config.getBucketDurationSeconds(); for (const serviceBucket of services) { const serviceName = serviceBucket.key || UNKNOWN_SERVICE; + totalSpanCount += serviceBucket.doc_count ?? 0; for (const envBucket of serviceBucket.environments.buckets) { const environment = envBucket.key || UNKNOWN_ENVIRONMENT; const total = envBucket.doc_count ?? 0; @@ -147,11 +149,11 @@ export class SpanMinuteAggregationService { if (bucketDocs.length === 0) { this.logger.log( - `집계 대상 스팬이 없어 비어 있는 분을 건너뜁니다. window=${window.start.toISOString()}~${window.end.toISOString()} es_took=${took}ms`, + `집계 대상 스팬이 없어 비어 있는 분을 건너뜁니다. window=${window.start.toISOString()}~${window.end.toISOString()} spans=0 es_took=${took}ms`, ); } else { this.logger.log( - `스팬 집계 완료 window=${window.start.toISOString()}~${window.end.toISOString()} services=${services.length} docs=${bucketDocs.length} es_took=${took}ms`, + `스팬 집계 완료 window=${window.start.toISOString()}~${window.end.toISOString()} services=${services.length} docs=${bucketDocs.length} spans=${totalSpanCount} es_took=${took}ms`, ); } diff --git a/backend/src/aggregator/window-planner.service.ts b/backend/src/aggregator/window-planner.service.ts index 594f77b..4ad56ec 100644 --- a/backend/src/aggregator/window-planner.service.ts +++ b/backend/src/aggregator/window-planner.service.ts @@ -53,6 +53,7 @@ export class MinuteWindowPlanner { if (Number.isNaN(timestamp) || timestamp <= 0) { return null; } + // 현재 시각을 버킷 크기로 나눈 뒤 내림하여 "완전히 종료된 분"까지만 바라본다. return timestamp - (timestamp % bucketMs); } @@ -61,6 +62,7 @@ export class MinuteWindowPlanner { if (remainder === 0) { return timestamp; } + // 특정 시각을 가장 가까운 버킷 시작점으로 내림 정렬한다. return timestamp - remainder; } } diff --git a/backend/src/query-api/service-metrics/service-metrics.service.ts b/backend/src/query-api/service-metrics/service-metrics.service.ts index 6e6bfc6..6f78a0a 100644 --- a/backend/src/query-api/service-metrics/service-metrics.service.ts +++ b/backend/src/query-api/service-metrics/service-metrics.service.ts @@ -1,6 +1,9 @@ import { Injectable, Logger } from "@nestjs/common"; +import { formatInTimeZone } from "date-fns-tz"; import { SpanRepository } from "../../shared/apm/spans/span.repository"; import type { ServiceMetricBucket } from "../../shared/apm/spans/span.repository"; +import { RollupMetricsReadRepository } from "../../shared/apm/rollup/rollup-metrics-read.repository"; +import type { RollupMetricDocument } from "../../shared/apm/rollup/rollup-metric.document"; import type { MetricResponse, AggregationProfiler, @@ -20,12 +23,37 @@ export class ServiceMetricsService { private readonly logger = new Logger(ServiceMetricsService.name); private readonly profilingEnabled = (process.env.SERVICE_METRICS_PROFILE ?? "false").toLowerCase() === "true"; + // 롤업 전략/캐시 관련 환경 변수 + private readonly rollupEnabled = + (process.env.ROLLUP_ENABLED ?? "true").toLowerCase() === "true"; + private readonly rollupThresholdMs = + this.minutesToMs(Number(process.env.ROLLUP_THRESHOLD_MINUTES ?? "5")) || + 5 * 60 * 1000; + private readonly rollupBucketMs = Math.max( + 60 * 1000, + this.minutesToMs(Number(process.env.ROLLUP_BUCKET_MINUTES ?? "1")), + ); + private readonly rollupCacheTtlSeconds = Math.max( + 0, + Number(process.env.ROLLUP_CACHE_TTL_SECONDS ?? "60"), + ); + private readonly rollupCachePrefix = + process.env.ROLLUP_CACHE_PREFIX ?? "apm:metrics-rollup"; + private readonly maxRollupBuckets = Math.max( + 500, + Number(process.env.ROLLUP_MAX_QUERY_BUCKETS ?? "43200"), + ); constructor( private readonly spanRepository: SpanRepository, + private readonly rollupRepository: RollupMetricsReadRepository, private readonly queryNormalizer: MetricsQueryNormalizerService, private readonly metricsCache: MetricsCacheService, - ) {} + ) { + this.logger.log( + `롤업 조회 설정: enabled=${this.rollupEnabled} thresholdMinutes=${this.rollupThresholdMs / 60000} bucketMinutes=${this.rollupBucketMs / 60000}`, + ); + } async getMetrics( serviceName: string, @@ -44,19 +72,37 @@ export class ServiceMetricsService { const cached = await this.metricsCache.get(cacheKey); if (cached) { this.logger.debug( - `서비스 메트릭 캐시 히트 service=${normalized.serviceName} env=${normalized.environment ?? "all"} from=${normalized.from} to=${normalized.to} interval=${normalized.interval}`, + `서비스 메트릭 캐시 히트 service=${normalized.serviceName} env=${normalized.environment ?? "all"} from=${this.formatTimestamp(normalized.from)} to=${this.formatTimestamp(normalized.to)} interval=${normalized.interval}`, ); return JSON.parse(cached) as MetricResponse[]; } this.logger.debug( - `서비스 메트릭 캐시 미스 service=${normalized.serviceName} env=${normalized.environment ?? "all"} from=${normalized.from} to=${normalized.to} interval=${normalized.interval}`, + `서비스 메트릭 캐시 미스 service=${normalized.serviceName} env=${normalized.environment ?? "all"} from=${this.formatTimestamp(normalized.from)} to=${this.formatTimestamp(normalized.to)} interval=${normalized.interval}`, ); } + const plan = this.buildFetchPlan(normalized); + this.logger.debug( + plan.rollupWindow + ? `롤업 구간 적용 service=${normalized.serviceName} rollupWindow=${this.formatTimestamp(plan.rollupWindow.from)}~${this.formatTimestamp(plan.rollupWindow.to)} rawWindow=${plan.rawWindow ? `${this.formatTimestamp(plan.rawWindow.from)}~${this.formatTimestamp(plan.rawWindow.to)}` : "없음"}` + : `롤업 미적용 service=${normalized.serviceName} window=${this.formatTimestamp(normalized.from)}~${this.formatTimestamp(normalized.to)}`, + ); + const rollupBuckets = plan.rollupWindow + ? await this.fetchRollupBuckets(normalized, plan.rollupWindow) + : []; + const rollupBucketCount = rollupBuckets.length; + this.logger.debug( + `롤업 버킷 조회 결과 service=${normalized.serviceName} buckets=${rollupBucketCount}`, + ); + const rawBuckets = plan.rawWindow + ? await this.fetchRawBuckets(normalized, plan.rawWindow) + : []; + this.logger.debug( + `RAW 버킷 조회 결과 service=${normalized.serviceName} buckets=${rawBuckets.length}`, + ); + const buckets = this.mergeMetricBuckets(rollupBuckets, rawBuckets); profiler?.mark("es_query"); - const params = this.toMetricParams(normalized); - const buckets = await this.spanRepository.aggregateServiceMetrics(params); const metrics = this.toMetricResponses( normalized.serviceName, normalized.environment, @@ -67,6 +113,15 @@ export class ServiceMetricsService { ? metrics.filter((item) => item.metric_name === normalized.metric) : metrics; + const totalBuckets = buckets.length; + const totalRequests = buckets.reduce( + (sum, bucket) => sum + bucket.total, + 0, + ); + this.logger.log( + `메트릭 조회 요약 service=${normalized.serviceName} env=${normalized.environment ?? "all"} window=${this.formatTimestamp(normalized.from)}~${this.formatTimestamp(normalized.to)} rollupBuckets=${rollupBucketCount} rawBuckets=${rawBuckets.length} totalBuckets=${totalBuckets} totalRequests=${totalRequests}`, + ); + profiler?.mark("response_ready"); profiler?.logSummary(filteredMetrics.length); @@ -79,24 +134,8 @@ export class ServiceMetricsService { } /** - * 캐시 로직과 ES 검색 파라미터 변환을 분리해 SOLID 원칙을 지킨다. + * raw/rollup 버킷을 공통 메트릭 응답 구조로 변환한다. */ - private toMetricParams(normalized: NormalizedServiceMetricsQuery): { - serviceName: string; - environment?: string; - from: string; - to: string; - interval: string; - } { - return { - serviceName: normalized.serviceName, - environment: normalized.environment, - from: normalized.from, - to: normalized.to, - interval: normalized.interval, - }; - } - private toMetricResponses( serviceName: string, environment: string | undefined, @@ -166,10 +205,238 @@ export class ServiceMetricsService { ]; } + /** + * 환경 레이블을 통일된 형태로 반환한다. + */ private baseLabels(environment?: string): Record | undefined { return environment ? { environment } : undefined; } + /** + * RAW 집계 대상 구간에 대해 기존 스팬 집계 쿼리를 실행한다. + */ + private async fetchRawBuckets( + normalized: NormalizedServiceMetricsQuery, + window: MetricsWindow | null, + ): Promise { + if (!window || window.from === window.to) { + return []; + } + return this.spanRepository.aggregateServiceMetrics({ + serviceName: normalized.serviceName, + environment: normalized.environment, + from: window.from, + to: window.to, + interval: normalized.interval, + }); + } + + /** + * 롤업 데이터 스트림에서 1분 버킷을 조회하고 ServiceMetricBucket 형태로 변환한다. + * - Redis 캐시가 켜져 있으면 window 범위 전체를 캐시에 저장한다. + */ + private async fetchRollupBuckets( + normalized: NormalizedServiceMetricsQuery, + window: MetricsWindow | null, + ): Promise { + if ( + !window || + !this.rollupEnabled || + window.from === window.to || + this.rollupBucketMs <= 0 + ) { + return []; + } + + const cacheKey = + this.rollupCacheAvailable() && + this.buildRollupCacheKey(normalized, window); + if (cacheKey) { + const cached = await this.metricsCache.get(cacheKey); + if (cached) { + return JSON.parse(cached) as ServiceMetricBucket[]; + } + } + + const fromMs = Date.parse(window.from); + const toMs = Date.parse(window.to); + if (!Number.isFinite(fromMs) || !Number.isFinite(toMs) || toMs <= fromMs) { + return []; + } + + // 롤업 문서는 1분 버킷이므로 검색 구간을 버킷 경계로 정규화한다. + const searchFrom = this.floorToBucket(fromMs); + const searchTo = this.floorToBucket(toMs); + if (searchTo <= searchFrom) { + return []; + } + + const expectedBuckets = Math.min( + this.maxRollupBuckets, + Math.max(1, Math.ceil((searchTo - searchFrom) / this.rollupBucketMs) + 5), + ); + const documents = await this.rollupRepository.search({ + serviceName: normalized.serviceName, + environment: normalized.environment, + from: new Date(searchFrom).toISOString(), + to: new Date(searchTo).toISOString(), + size: expectedBuckets, + }); + const lowerBound = fromMs; + const upperBound = toMs; + const buckets = documents + .map((doc) => this.mapRollupDocument(doc)) + .filter((bucket) => { + const ts = Date.parse(bucket.timestamp); + return ts >= lowerBound && ts < upperBound; + }); + + if (cacheKey) { + await this.metricsCache.set( + cacheKey, + JSON.stringify(buckets), + this.rollupCacheTtlSeconds, + ); + } + + return buckets; + } + + /** + * 롤업 버킷과 RAW 버킷을 timestamp 기준으로 병합해 시간순 정렬을 유지한다. + */ + private mergeMetricBuckets( + rollupBuckets: ServiceMetricBucket[], + rawBuckets: ServiceMetricBucket[], + ): ServiceMetricBucket[] { + const merged = [...rollupBuckets, ...rawBuckets]; + if (merged.length <= 1) { + return merged; + } + return merged.sort( + (a, b) => + new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(), + ); + } + + /** + * 조회 범위 길이에 따라 롤업/RAW 윈도우를 나눈다. + */ + private buildFetchPlan( + normalized: NormalizedServiceMetricsQuery, + ): MetricsFetchPlan { + if (!this.rollupEnabled) { + return { + rollupWindow: null, + rawWindow: { from: normalized.from, to: normalized.to }, + }; + } + + const fromMs = Date.parse(normalized.from); + const toMs = Date.parse(normalized.to); + if (!Number.isFinite(fromMs) || !Number.isFinite(toMs) || toMs <= fromMs) { + return { + rollupWindow: null, + rawWindow: { from: normalized.from, to: normalized.to }, + }; + } + + if (toMs - fromMs <= this.rollupThresholdMs) { + return { + rollupWindow: null, + rawWindow: { from: normalized.from, to: normalized.to }, + }; + } + + // 최신 threshold 구간만 raw 데이터로 남기고, 이전 구간은 롤업 인덱스로 대체한다. + const splitPoint = toMs - this.rollupThresholdMs; + if (splitPoint <= fromMs) { + return { + rollupWindow: null, + rawWindow: { from: normalized.from, to: normalized.to }, + }; + } + + const rollupWindow: MetricsWindow = { + from: normalized.from, + to: new Date(splitPoint).toISOString(), + }; + + const rawWindow: MetricsWindow = { + from: rollupWindow.to, + to: normalized.to, + }; + return { + rollupWindow, + rawWindow, + }; + } + + /** + * 롤업 인덱스 문서를 raw 메트릭 버킷과 동일한 형태로 변환한다. + */ + private mapRollupDocument(doc: RollupMetricDocument): ServiceMetricBucket { + const total = typeof doc.request_count === "number" ? doc.request_count : 0; + const errors = typeof doc.error_count === "number" ? doc.error_count : 0; + const errorRate = + typeof doc.error_rate === "number" + ? doc.error_rate + : total > 0 + ? errors / total + : 0; + + return { + timestamp: doc["@timestamp_bucket"], + total, + errorRate, + p95Latency: Number(doc.latency_p95_ms ?? 0), + p90Latency: Number(doc.latency_p90_ms ?? 0), + p50Latency: Number(doc.latency_p50_ms ?? 0), + }; + } + + /** + * 롤업 결과 캐시 키를 window 기준으로 생성한다. + */ + private buildRollupCacheKey( + normalized: NormalizedServiceMetricsQuery, + window: MetricsWindow, + ): string { + return [ + this.rollupCachePrefix, + `service:${normalized.serviceName}`, + `env:${normalized.environment ?? "all"}`, + `metric:${normalized.metric ?? "all"}`, + `from:${window.from}`, + `to:${window.to}`, + ].join("|"); + } + + /** + * 롤업 캐시를 사용할 수 있는지(TTL>0, Redis 활성 상태) 확인한다. + */ + private rollupCacheAvailable(): boolean { + return this.rollupCacheTtlSeconds > 0 && this.metricsCache.isEnabled(); + } + + /** + * timestamp를 롤업 버킷 크기에 맞춰 내림 정렬한다. + */ + private floorToBucket(timestampMs: number): number { + if (!Number.isFinite(timestampMs) || timestampMs <= 0) { + return 0; + } + return Math.floor(timestampMs / this.rollupBucketMs) * this.rollupBucketMs; + } + + /** + * 분 단위 입력 값을 밀리초로 변환한다. + */ + private minutesToMs(value: number): number { + const safe = Number.isFinite(value) && value > 0 ? value : 0; + return safe * 60 * 1000; + } + private createProfiler( normalized: NormalizedServiceMetricsQuery, ): AggregationProfiler | null { @@ -183,15 +450,29 @@ export class ServiceMetricsService { const label = event === "es_query" ? "ES 집계 완료" : "응답 데이터 정리"; this.logger.debug( - `메트릭 성능(${label}) service=${normalized.serviceName} env=${normalized.environment ?? "all"} from=${normalized.from} to=${normalized.to} elapsed=${elapsed}ms`, + `메트릭 성능(${label}) service=${normalized.serviceName} env=${normalized.environment ?? "all"} from=${this.formatTimestamp(normalized.from)} to=${this.formatTimestamp(normalized.to)} elapsed=${elapsed}ms`, ); }, logSummary: (responseLength) => { const elapsed = Date.now() - startedAt; this.logger.log( - `메트릭 총 소요 service=${normalized.serviceName} env=${normalized.environment ?? "all"} window=${normalized.from}~${normalized.to} metrics=${responseLength}건 elapsed=${elapsed}ms`, + `메트릭 총 소요 service=${normalized.serviceName} env=${normalized.environment ?? "all"} window=${this.formatTimestamp(normalized.from)}~${this.formatTimestamp(normalized.to)} metrics=${responseLength}건 elapsed=${elapsed}ms`, ); }, }; } + + private formatTimestamp(value: string): string { + return formatInTimeZone(value, "Asia/Seoul", "MM-dd HH:mm:ss"); + } +} + +interface MetricsWindow { + from: string; + to: string; +} + +interface MetricsFetchPlan { + rollupWindow: MetricsWindow | null; + rawWindow: MetricsWindow | null; } diff --git a/backend/src/shared/apm/apm.module.ts b/backend/src/shared/apm/apm.module.ts index a891484..a04e66a 100644 --- a/backend/src/shared/apm/apm.module.ts +++ b/backend/src/shared/apm/apm.module.ts @@ -2,11 +2,12 @@ import { Global, Module } from "@nestjs/common"; import { LogInfrastructureModule } from "../logs/logs.module"; import { ApmLogRepository } from "./logs/log.repository"; import { SpanRepository } from "./spans/span.repository"; +import { RollupMetricsReadRepository } from "./rollup/rollup-metrics-read.repository"; @Global() @Module({ imports: [LogInfrastructureModule], - providers: [ApmLogRepository, SpanRepository], - exports: [ApmLogRepository, SpanRepository], + providers: [ApmLogRepository, SpanRepository, RollupMetricsReadRepository], + exports: [ApmLogRepository, SpanRepository, RollupMetricsReadRepository], }) export class ApmInfrastructureModule {} diff --git a/backend/src/shared/apm/common/environment.util.ts b/backend/src/shared/apm/common/environment.util.ts new file mode 100644 index 0000000..e46940a --- /dev/null +++ b/backend/src/shared/apm/common/environment.util.ts @@ -0,0 +1,29 @@ +/** + * 환경 문자열을 공통 규칙에 맞춰 정규화한다. + * - 사용자 입력은 대소문자/별칭(prod, production 등)을 허용한다. + */ +export function normalizeEnvironmentFilter( + environment?: string, +): string | undefined { + if (!environment) { + return undefined; + } + const trimmed = environment.trim(); + if (!trimmed) { + return undefined; + } + + const key = trimmed.toLowerCase(); + const alias: Record = { + prod: "production", + production: "production", + dev: "development", + development: "development", + stage: "staging", + staging: "staging", + qa: "qa", + test: "test", + }; + + return alias[key] ?? trimmed; +} diff --git a/backend/src/aggregator/types/rollup-metric-document.ts b/backend/src/shared/apm/rollup/rollup-metric.document.ts similarity index 80% rename from backend/src/aggregator/types/rollup-metric-document.ts rename to backend/src/shared/apm/rollup/rollup-metric.document.ts index 772b9da..d53f6f8 100644 --- a/backend/src/aggregator/types/rollup-metric-document.ts +++ b/backend/src/shared/apm/rollup/rollup-metric.document.ts @@ -1,6 +1,6 @@ /** - * 롤업 된 APM 메트릭 문서 스키마 - * - Elasticsearch data stream(metrics-apm.*)에 그대로 저장된다. + * 롤업된 APM 메트릭 문서 스키마 + * - Query API와 Aggregator가 함께 참조한다. */ export interface RollupMetricDocument extends Record { "@timestamp": string; diff --git a/backend/src/shared/apm/rollup/rollup-metrics-read.repository.ts b/backend/src/shared/apm/rollup/rollup-metrics-read.repository.ts new file mode 100644 index 0000000..be6d7ca --- /dev/null +++ b/backend/src/shared/apm/rollup/rollup-metrics-read.repository.ts @@ -0,0 +1,65 @@ +import { Injectable } from "@nestjs/common"; +import type { Client } from "@elastic/elasticsearch"; +import { LogStorageService } from "../../logs/log-storage.service"; +import { normalizeEnvironmentFilter } from "../common/environment.util"; +import type { RollupMetricDocument } from "./rollup-metric.document"; + +export interface RollupMetricsSearchParams { + serviceName: string; + environment?: string; + from: string; + to: string; + size: number; +} + +/** + * Query API가 `metrics-apm` 데이터 스트림을 읽어오는 전용 레포지토리 + */ +@Injectable() +export class RollupMetricsReadRepository { + private readonly client: Client; + private readonly dataStream: string; + + constructor(storage: LogStorageService) { + this.client = storage.getClient(); + this.dataStream = storage.getDataStream("apmRollupMetrics"); + } + + async search( + params: RollupMetricsSearchParams, + ): Promise { + const filter: Array> = [ + { term: { service_name: params.serviceName } }, + { + range: { + "@timestamp_bucket": { + gte: params.from, + lt: params.to, + }, + }, + }, + ]; + + const env = normalizeEnvironmentFilter(params.environment); + if (env) { + filter.push({ term: { environment: env } }); + } + + const response = await this.client.search({ + index: this.dataStream, + size: Math.max(1, params.size), + sort: [{ "@timestamp_bucket": { order: "asc" as const } }], + query: { + bool: { + filter, + }, + }, + }); + + return response.hits.hits + .filter((hit): hit is typeof hit & { _source: RollupMetricDocument } => + Boolean(hit._source), + ) + .map((hit) => hit._source); + } +} diff --git a/backend/src/shared/apm/spans/span.repository.ts b/backend/src/shared/apm/spans/span.repository.ts index eba5cb2..74910be 100644 --- a/backend/src/shared/apm/spans/span.repository.ts +++ b/backend/src/shared/apm/spans/span.repository.ts @@ -5,6 +5,7 @@ import { } from "../common/base-apm.repository"; import type { SpanDocument } from "./span.document"; import { LogStorageService } from "../../logs/log-storage.service"; +import { normalizeEnvironmentFilter } from "../common/environment.util"; export interface SpanSearchParams { traceId: string; @@ -132,30 +133,6 @@ export class SpanRepository extends BaseApmRepository { super(storage, SpanRepository.STREAM_KEY); } - private normalizeEnvironmentFilter(environment?: string): string | undefined { - if (!environment) { - return undefined; - } - const trimmed = environment.trim(); - if (!trimmed) { - return undefined; - } - - const key = trimmed.toLowerCase(); - const alias: Record = { - prod: "production", - production: "production", - dev: "development", - development: "development", - stage: "staging", - staging: "staging", - qa: "qa", - test: "test", - }; - - return alias[key] ?? trimmed; - } - private buildTimeRangeFilter(from: string, to: string) { return { range: { @@ -196,7 +173,7 @@ export class SpanRepository extends BaseApmRepository { if (params.serviceName) { filter.push({ term: { service_name: params.serviceName } }); } - const normalizedEnv = this.normalizeEnvironmentFilter(params.environment); + const normalizedEnv = normalizeEnvironmentFilter(params.environment); if (normalizedEnv) { filter.push({ term: { environment: normalizedEnv } }); } @@ -229,9 +206,7 @@ export class SpanRepository extends BaseApmRepository { async aggregateServiceMetrics( params: ServiceMetricQuery, ): Promise { - const environmentFilter = this.normalizeEnvironmentFilter( - params.environment, - ); + const environmentFilter = normalizeEnvironmentFilter(params.environment); const response = await this.client.search({ index: this.dataStream, size: 0, @@ -326,9 +301,7 @@ export class SpanRepository extends BaseApmRepository { async aggregateServiceOverview( params: ServiceOverviewParams, ): Promise { - const environmentFilter = this.normalizeEnvironmentFilter( - params.environment, - ); + const environmentFilter = normalizeEnvironmentFilter(params.environment); const response = await this.client.search({ index: this.dataStream, size: 0, @@ -430,9 +403,7 @@ export class SpanRepository extends BaseApmRepository { async aggregateEndpointMetrics( params: EndpointMetricsParams, ): Promise { - const environmentFilter = this.normalizeEnvironmentFilter( - params.environment, - ); + const environmentFilter = normalizeEnvironmentFilter(params.environment); const response = await this.client.search({ index: this.dataStream, size: 0, @@ -521,9 +492,7 @@ export class SpanRepository extends BaseApmRepository { async findRecentTracesByEndpoint( params: EndpointTraceQueryParams, ): Promise { - const environmentFilter = this.normalizeEnvironmentFilter( - params.environment, - ); + const environmentFilter = normalizeEnvironmentFilter(params.environment); const filters: Array> = [ { term: { service_name: params.serviceName } }, { term: { kind: "SERVER" } }, @@ -631,7 +600,7 @@ export class SpanRepository extends BaseApmRepository { params: SpanListQuery, ): Promise> { const from = (params.page - 1) * params.size; - const normalizedEnv = this.normalizeEnvironmentFilter(params.environment); + const normalizedEnv = normalizeEnvironmentFilter(params.environment); const filters: Array> = [ this.buildTimeRangeFilter(params.from, params.to), ...(normalizedEnv ? [{ term: { environment: normalizedEnv } }] : []), @@ -696,7 +665,7 @@ export class SpanRepository extends BaseApmRepository { params: ServiceTraceSearchParams, ): Promise> { const from = (params.page - 1) * params.size; - const normalizedEnv = this.normalizeEnvironmentFilter(params.environment); + const normalizedEnv = normalizeEnvironmentFilter(params.environment); const filters: Array> = [ { term: { service_name: params.serviceName } }, { term: { kind: "SERVER" } },