Skip to content

Commit

Permalink
rotor: Add support for custom CA certificate in Kafka SSL configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
dmeremyanin committed Jan 24, 2025
1 parent 901c174 commit f3dab7b
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions services/rotor/src/lib/kafka-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Kafka, logLevel, CompressionCodecs, CompressionTypes } from "kafkajs";
import SnappyCodec from "kafkajs-snappy";
import "@sensejs/kafkajs-zstd-support";

import { LogMessageBuilder, requireDefined, randomId, getLog } from "juava";
import { readFileSync } from "fs";
import { isTruish, LogMessageBuilder, requireDefined, randomId, getLog } from "juava";
import JSON5 from "json5";
const log = getLog("kafka");

Expand All @@ -25,7 +26,7 @@ function translateLevel(l: logLevel): LogMessageBuilder {

export type KafkaCredentials = {
brokers: string[] | string;
ssl?: boolean;
ssl?: boolean | Record<string, any>;
sasl?: {
mechanism: "scram-sha-256" | "scram-sha-512";
username: string;
Expand All @@ -34,9 +35,32 @@ export type KafkaCredentials = {
};

export function getCredentialsFromEnv(): KafkaCredentials {
const ssl = isTruish(process.env.KAFKA_SSL);
const sslSkipVerify = isTruish(process.env.KAFKA_SSL_SKIP_VERIFY);

let sslOption: boolean | Record<string, any> | undefined = undefined;

if (ssl) {
if (sslSkipVerify) {
// TLS enabled, but server TLS certificate is not verified
sslOption = {
rejectUnauthorized: false,
checkServerIdentity: () => undefined,
};
} else if (process.env.KAFKA_SSL_CA) {
// TLS enabled, server TLS certificate is verified using a custom CA certificate
sslOption = {
ca: readFileSync(process.env.KAFKA_SSL_CA, "utf-8"),
};
} else {
// TLS enabled, no extra configurations
sslOption = true;
}
}

return {
brokers: requireDefined(process.env.KAFKA_BOOTSTRAP_SERVERS, "env KAFKA_BOOTSTRAP_SERVERS is required").split(","),
ssl: process.env.KAFKA_SSL === "true" || process.env.KAFKA_SSL === "1",
ssl: sslOption,
sasl: process.env.KAFKA_SASL ? JSON5.parse(process.env.KAFKA_SASL) : undefined,
};
}
Expand All @@ -57,13 +81,7 @@ export function connectToKafka(opts: { defaultAppId: string } & KafkaCredentials
// },
clientId: process.env.APPLICATION_ID || opts.defaultAppId,
brokers: typeof opts.brokers === "string" ? opts.brokers.split(",") : opts.brokers,
ssl: opts.ssl
? {
rejectUnauthorized: false,
checkServerIdentity: () => undefined,
}
: undefined,

ssl: opts.ssl,
...sasl,
});
}
Expand Down

0 comments on commit f3dab7b

Please sign in to comment.