Skip to content

Commit

Permalink
Implement Kafka Authentication & clean up timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsMissEm committed May 28, 2024
1 parent b80b53d commit 9d5d98e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
2 changes: 0 additions & 2 deletions src/define_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ export function defineConfig(config = {}): KafkaConfig {
return {
brokers: 'localhost:9092',
clientId: 'local',
connectionTimeout: 3000,
requestTimeout: 60000,
logLevel: 'info',
// Overwrite default config values if another one is provided
...config,
Expand Down
10 changes: 7 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ export class Kafka implements KafkaContract {
private createKafka() {
this.#kafka = new KafkaJs({
brokers: this.getBrokers(),
clientId: this.#config.clientId || 'local',
connectionTimeout: this.#config.connectionTimeout,
requestTimeout: this.#config.requestTimeout,
ssl: this.#config.ssl,
sasl: this.#config.sasl,
clientId: this.#config.clientId,
connectionTimeout: this.#config.timeouts?.connection,
requestTimeout: this.#config.timeouts?.request,
authenticationTimeout: this.#config.timeouts?.authentication,
reauthenticationThreshold: this.#config.timeouts?.reauthentication,
logLevel: toKafkaLogLevel(this.#config.logLevel),
logCreator: (logLevel: KafkaLogLevel) => {
this.#logger.level = toAdonisLoggerLevel(logLevel)
Expand Down
14 changes: 12 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import type {
ConsumerRunConfig as KafkaConsumerRunConfig,
Message as KafkaMessage,
EachMessagePayload as KafkaEachMessagePayload,
SASLOptions,
} from 'kafkajs'

import type tls from 'node:tls'

import type { Level } from '@adonisjs/logger/types'
import type { Consumer } from './consumer.ts'
import type { Producer } from './producer.ts'
Expand Down Expand Up @@ -37,9 +41,15 @@ declare module '@adonisjs/core/types' {

export interface KafkaConfig {
brokers?: string | string[]
ssl?: tls.ConnectionOptions | boolean
sasl?: SASLOptions
clientId?: string
connectionTimeout?: number
requestTimeout?: number
timeouts?: {
connection?: number
authentication?: number
reauthentication?: number
request?: number
}
logLevel: Level
}

Expand Down
2 changes: 0 additions & 2 deletions stubs/config/kafka.stub
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import env from '#start/env'
const kafkaConfig = {
brokers: env.get('KAFKA_BROKERS', 'localhost:9092'),
clientId: env.get('KAFKA_CLIENT_ID'),
connectionTimeout: env.get('KAFKA_CONNECTION_TIMEOUT', 3000),
requestTimeout: env.get('KAFKA_REQUEST_TIMEOUT', 60000),
logLevel: env.get('KAFKA_LOG_LEVEL', env.get('LOG_LEVEL')),
}

Expand Down

0 comments on commit 9d5d98e

Please sign in to comment.