diff --git a/services/rotor/src/lib/kafka-config.ts b/services/rotor/src/lib/kafka-config.ts index 138e15c37..0c5c619a0 100644 --- a/services/rotor/src/lib/kafka-config.ts +++ b/services/rotor/src/lib/kafka-config.ts @@ -2,7 +2,8 @@ import { Kafka, logLevel, CompressionCodecs, CompressionTypes } from "kafkajs"; import SnappyCodec from "kafkajs-snappy"; import "@sensejs/kafkajs-zstd-support"; -import { LogMessageBuilder, requireDefined, randomId, getLog } from "juava"; +import { readFileSync } from "fs"; +import { isTruish, LogMessageBuilder, requireDefined, randomId, getLog } from "juava"; import JSON5 from "json5"; const log = getLog("kafka"); @@ -25,7 +26,7 @@ function translateLevel(l: logLevel): LogMessageBuilder { export type KafkaCredentials = { brokers: string[] | string; - ssl?: boolean; + ssl?: boolean | Record; sasl?: { mechanism: "scram-sha-256" | "scram-sha-512"; username: string; @@ -34,9 +35,37 @@ export type KafkaCredentials = { }; export function getCredentialsFromEnv(): KafkaCredentials { + const ssl = isTruish(process.env.KAFKA_SSL); + const sslSkipVerify = isTruish(process.env.KAFKA_SSL_SKIP_VERIFY); + + let sslOption: KafkaCredentials["ssl"] = undefined; + + if (ssl) { + if (sslSkipVerify) { + // TLS enabled, but server TLS certificate is not verified + sslOption = { + rejectUnauthorized: false, + checkServerIdentity: () => undefined, + }; + } else if (process.env.KAFKA_SSL_CA) { + // TLS enabled, server TLS certificate is verified using a custom CA certificate + sslOption = { + ca: process.env.KAFKA_SSL_CA, + }; + } else if (process.env.KAFKA_SSL_CA_FILE) { + // TLS enabled, server TLS certificate is verified using a custom CA certificate (loaded from a local file) + sslOption = { + ca: readFileSync(process.env.KAFKA_SSL_CA_FILE, "utf-8"), + }; + } else { + // TLS enabled, no extra configurations + sslOption = true; + } + } + return { brokers: requireDefined(process.env.KAFKA_BOOTSTRAP_SERVERS, "env KAFKA_BOOTSTRAP_SERVERS is required").split(","), - ssl: process.env.KAFKA_SSL === "true" || process.env.KAFKA_SSL === "1", + ssl: sslOption, sasl: process.env.KAFKA_SASL ? JSON5.parse(process.env.KAFKA_SASL) : undefined, }; } @@ -57,13 +86,7 @@ export function connectToKafka(opts: { defaultAppId: string } & KafkaCredentials // }, clientId: process.env.APPLICATION_ID || opts.defaultAppId, brokers: typeof opts.brokers === "string" ? opts.brokers.split(",") : opts.brokers, - ssl: opts.ssl - ? { - rejectUnauthorized: false, - checkServerIdentity: () => undefined, - } - : undefined, - + ssl: opts.ssl, ...sasl, }); }