Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
3 changes: 1 addition & 2 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ name: CD - Build & Deploy
on:
push:
branches:
- feature/bulk-v3
- feat/producer-rps-optimization
- main
workflow_dispatch:

env:
Expand Down
44 changes: 0 additions & 44 deletions README.md

This file was deleted.

25 changes: 25 additions & 0 deletions backend/src/shared/common/kafka/kafka.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ interface KafkaMicroserviceParams {

type KafkaSecurityOverrides = Pick<KafkaConfig, "ssl" | "sasl">;

// fetch 옵션 기본값 (환경 변수가 없을 때 사용)
const DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024; // 50MB
const DEFAULT_FETCH_MAX_BYTES_PER_PARTITION = 10 * 1024 * 1024; // 10MB
const DEFAULT_FETCH_MIN_BYTES = 1 * 1024 * 1024; // 1MB
const DEFAULT_FETCH_MAX_WAIT_MS = 50;

// 양수인 정수 환경 변수만 추출해 Kafka fetch 옵션에 안전하게 반영한다.
function parsePositiveInt(value: string | undefined): number | undefined {
const parsed = Number.parseInt(value ?? "", 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined;
}

function buildKafkaSecurityConfig(): KafkaSecurityOverrides {
const sslEnabled = process.env.KAFKA_SSL === "true";
const sslRejectUnauthorized =
Expand Down Expand Up @@ -147,6 +159,19 @@ export function createKafkaMicroserviceOptions(
process.env.KAFKA_CONCURRENT_PARTITIONS ?? "3",
10,
),
// fetch 용량/대기 시간을 환경 변수로 조절해 한 번에 더 많은 레코드를 끌어올 수 있다.
maxBytes:
parsePositiveInt(process.env.KAFKA_FETCH_MAX_BYTES) ??
DEFAULT_FETCH_MAX_BYTES,
maxBytesPerPartition:
parsePositiveInt(process.env.KAFKA_FETCH_MAX_BYTES_PER_PARTITION) ??
DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
minBytes:
parsePositiveInt(process.env.KAFKA_FETCH_MIN_BYTES) ??
DEFAULT_FETCH_MIN_BYTES,
maxWaitTimeInMs:
parsePositiveInt(process.env.KAFKA_FETCH_MAX_WAIT_MS) ??
DEFAULT_FETCH_MAX_WAIT_MS,
},
},
};
Expand Down
6 changes: 3 additions & 3 deletions backend/src/stream-processor/common/bulk-indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ export class BulkIndexerService implements OnModuleDestroy {
this.client = this.storage.getClient();
this.maxBatchSize = Math.max(
1,
Number.parseInt(process.env.BULK_BATCH_SIZE ?? "1000", 10),
Number.parseInt(process.env.BULK_BATCH_SIZE ?? "6000", 10),
);
const byteLimitMb = Number.parseFloat(
process.env.BULK_BATCH_BYTES_MB ?? "10",
process.env.BULK_BATCH_BYTES_MB ?? "32",
);
this.maxBatchBytes = Math.max(1024, Math.floor(byteLimitMb * 1024 * 1024));
this.flushIntervalMs = Math.max(
Expand All @@ -49,7 +49,7 @@ export class BulkIndexerService implements OnModuleDestroy {
);
this.maxParallelFlushes = Math.max(
1,
Number.parseInt(process.env.BULK_MAX_PARALLEL_FLUSHES ?? "3", 10),
Number.parseInt(process.env.BULK_MAX_PARALLEL_FLUSHES ?? "6", 10),
);
}

Expand Down