Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions AUTH.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ KafkaEx supports SASL authentication for secure Kafka clusters. Multiple mechani
- **PLAIN** - Simple username/password (requires SSL/TLS)
- **SCRAM-SHA-256** - Secure challenge-response authentication (Kafka 0.10.2+)
- **SCRAM-SHA-512** - Secure challenge-response with stronger hash (Kafka 0.10.2+)
- **SCRAM-SHA-512** - Secure challenge-response with stronger hash (Kafka 0.10.2+)
- **OAUTHBEARER** - Token-based authentication using OAuth 2.0 bearer tokens (Kafka 2.0+)

## Configuration

Expand Down Expand Up @@ -58,6 +60,7 @@ docker-compose up -d
# 9092 - No authentication (SSL)
# 9192 - SASL/PLAIN (SSL)
# 9292 - SASL/SCRAM (SSL)
# 9392 - SASL/Oauthbearer
```

## Security Considerations
Expand All @@ -70,6 +73,7 @@ docker-compose up -d

- PLAIN: Kafka 0.9.0+
- SCRAM: Kafka 0.10.2+
- SASL-Oauthbearer: Kafka 2.0+

## Testing with Different Mechanisms

Expand All @@ -94,6 +98,20 @@ config :kafka_ex,
}
```

## OAUTHBEARER
```elixir
config :kafka_ex,
brokers: [{"localhost", 9394}],
use_ssl: true,
sasl: %{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: &MyOAuth.get_token/0,
extensions: %{"traceId" => "optional"}
}
}
```

## Integration with Existing Code

SASL authentication is transparent to the rest of your KafkaEx usage:
Expand Down
15 changes: 9 additions & 6 deletions docker-compose-arm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ services:
- "9092:9092" # NOAUTH
- "9192:9192" # PLAIN
- "9292:9292" # SCRAM
- "9392:9392" # OAUTH
environment:
KAFKA_BROKER_ID: "1"

KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292"
KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292,OAUTH://:9392"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292,OAUTH://localhost:9392"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-1:29092"]
interval: 20s
Expand All @@ -46,11 +47,12 @@ services:
- "9093:9093" # NOAUTH
- "9193:9193" # PLAIN
- "9293:9293" # SCRAM
- "9393:9393" # OAUTH
environment:
KAFKA_BROKER_ID: "2"

KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293"
KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293,OAUTH://:9393"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293,OAUTH://localhost:9393"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-2:29093"]
interval: 20s
Expand All @@ -69,11 +71,12 @@ services:
- "9094:9094" # NOAUTH
- "9194:9194" # PLAIN
- "9294:9294" # SCRAM
- "9394:9394" # OAUTH
environment:
KAFKA_BROKER_ID: "3"

KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294"
KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294,OAUTH://:9394"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294,OAUTH://localhost:9394"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-3:29094"]
interval: 20s
Expand Down
8 changes: 6 additions & 2 deletions docker-compose-kafka.env
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ KAFKA_SSL_TRUSTSTORE_PASSWORD=kafka_ex
KAFKA_SSL_KEYSTORE_TYPE=PKCS12
KAFKA_SSL_TRUSTSTORE_TYPE=PKCS12

KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,NOAUTH:SSL,PLAIN:SASL_SSL,SCRAM:SASL_SSL
KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,NOAUTH:SSL,PLAIN:SASL_SSL,SCRAM:SASL_SSL,OAUTH:SASL_SSL
KAFKA_LISTENER_NAME_plain_SECURITY_PROTOCOL=SASL_SSL
KAFKA_LISTENER_NAME_scram_SECURITY_PROTOCOL=SASL_SSL
KAFKA_LISTENER_NAME_INTERNAL_SECURITY_PROTOCOL=PLAINTEXT
Expand All @@ -38,6 +38,10 @@ KAFKA_LISTENER_NAME_scram_SCRAM-SHA-256_SASL_JAAS_CONFIG=org.apache.kafka.common
KAFKA_LISTENER_NAME_scram_SCRAM-SHA-512_SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required;
KAFKA_LISTENER_NAME_plain_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="admin-secret" user_test="secret" user_alice="alice-secret";

KAFKA_LISTENER_NAME_oauth_SASL_ENABLED_MECHANISMS=OAUTHBEARER
KAFKA_LISTENER_NAME_oauth_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="test";
KAFKA_LISTENER_NAME_oauth_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler

######## Config

KAFKA_DELETE_TOPIC_ENABLE=true
Expand Down
15 changes: 9 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ services:
- "9092:9092" # NOAUTH
- "9192:9192" # PLAIN
- "9292:9292" # SCRAM
- "9392:9392" # OAUTH
environment:
KAFKA_BROKER_ID: "1"

KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292"
KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292,OAUTH://:9392"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292,OAUTH://localhost:9392"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-1:29092"]
interval: 20s
Expand All @@ -46,11 +47,12 @@ services:
- "9093:9093" # NOAUTH
- "9193:9193" # PLAIN
- "9293:9293" # SCRAM
- "9393:9393" # OAUTH
environment:
KAFKA_BROKER_ID: "2"

KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293"
KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293,OAUTH://:9393"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293,OAUTH://localhost:9393"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-2:29093"]
interval: 20s
Expand All @@ -69,11 +71,12 @@ services:
- "9094:9094" # NOAUTH
- "9194:9194" # PLAIN
- "9294:9294" # SCRAM
- "9394:9394" # OAUTH
environment:
KAFKA_BROKER_ID: "3"

KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294"
KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294,OAUTH://:9394"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294,OAUTH://localhost:9394"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-3:29094"]
interval: 20s
Expand Down
13 changes: 13 additions & 0 deletions lib/kafka_ex/auth/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ defmodule KafkaEx.Auth.Config do
require_keys!(cfg, [:username, :password])
cfg

:oauthbearer ->
opts = cfg[:mechanism_opts] || %{}

unless is_function(opts[:token_provider], 0) do
raise ArgumentError, "oauthbearer requires mechanism_opts.token_provider/0"
end

if is_map(opts[:extensions]) and Map.has_key?(opts[:extensions], "auth") do
raise ArgumentError, "'auth' is a reserved extension name"
end

Map.merge(%{username: nil, password: nil}, cfg)

_ ->
raise ArgumentError, "Unsupported SASL mechanism: #{inspect(mech)}"
end
Expand Down
5 changes: 3 additions & 2 deletions lib/kafka_ex/auth/sasl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ defmodule KafkaEx.Auth.SASL do

@mechanisms %{
plain: KafkaEx.Auth.SASL.Plain,
scram: KafkaEx.Auth.SASL.Scram
# Future extensions: oauthbearer, msk_iam_auth
scram: KafkaEx.Auth.SASL.Scram,
oauthbearer: KafkaEx.Auth.SASL.OAuthBearer
# Future extensions: msk_iam_auth
}

# -------- Public API --------
Expand Down
63 changes: 63 additions & 0 deletions lib/kafka_ex/auth/sasl/oauthbearer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule KafkaEx.Auth.SASL.OAuthBearer do
@moduledoc """
SASL/OAUTHBEARER mechanism for Kafka (KIP-255, KIP-342).

## Configuration

KafkaEx.Auth.Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: fn -> {:ok, "jwt-token"} end,
extensions: %{"traceId" => "abc123"} # optional, KIP-342
}
})

## Token Provider

The `token_provider` is a 0-arity function returning:
- `{:ok, token}` - JWT token string
- `{:error, reason}` - failure

The provider is called on each new connection and is responsible for
caching and refreshing tokens.
"""

@behaviour KafkaEx.Auth.Mechanism

alias KafkaEx.Auth.Config

@impl true
def mechanism_name(_config), do: "OAUTHBEARER"

@impl true
def authenticate(%Config{mechanism_opts: opts}, send_fun) do
with {:ok, token} <- fetch_token(opts) do
case send_fun.(build_initial_response(token, opts)) do
{:ok, _} -> :ok
{:error, _} = err -> err
end
end
end

defp fetch_token(%{token_provider: provider}) when is_function(provider, 0) do
case provider.() do
{:ok, token} when is_binary(token) and byte_size(token) > 0 -> {:ok, token}
{:ok, _} -> {:error, :invalid_token}
{:error, _} = err -> err
end
end

defp fetch_token(_), do: {:error, :missing_token_provider}

# RFC 7628 / KIP-255: "n,,\x01auth=Bearer <token>[\x01ext=val...]\x01\x01"
defp build_initial_response(token, opts) do
extensions = Map.get(opts, :extensions, %{})
"n,,\x01auth=Bearer #{token}#{encode_extensions(extensions)}\x01\x01"
end

defp encode_extensions(exts) when map_size(exts) == 0, do: ""

defp encode_extensions(exts) do
Enum.map_join(exts, "", fn {k, v} -> "\x01#{k}=#{v}" end)
end
end
77 changes: 77 additions & 0 deletions test/integration/oauthbearer_authentication_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule KafkaEx.Integration.OAuthBearerAuthenticationTest do
use ExUnit.Case

@moduletag :integration
@moduletag :oauthbearer

describe "SASL/OAUTHBEARER authentication" do
@tag sasl: :oauthbearer
test "connects and produces with unsecured JWT" do
token = build_unsecured_jwt("test-user")

opts = [
uris: [{"localhost", 9394}],
use_ssl: true,
ssl_options: [verify: :verify_none],
auth:
KafkaEx.Auth.Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{token_provider: fn -> {:ok, token} end}
})
]

{:ok, _pid} = KafkaEx.create_worker(:oauth_worker, opts)

assert KafkaEx.produce("test_topic", 0, "oauth_msg", worker_name: :oauth_worker) == :ok
# probably redundant, but whatever
assert %KafkaEx.Protocol.Metadata.Response{} = KafkaEx.metadata(worker_name: :oauth_worker)
end

@tag sasl: :oauthbearer
test "connects with extensions" do
token = build_unsecured_jwt("test-user")

opts = [
uris: [{"localhost", 9394}],
use_ssl: true,
ssl_options: [verify: :verify_none],
auth:
KafkaEx.Auth.Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: fn -> {:ok, token} end,
extensions: %{"traceId" => "test-123"}
}
})
]

{:ok, _pid} = KafkaEx.create_worker(:oauth_ext_worker, opts)
# probably redundant, but whatever
assert %KafkaEx.Protocol.Metadata.Response{} = KafkaEx.metadata(worker_name: :oauth_ext_worker)
end

@tag sasl: :oauthbearer
test "fails with invalid token" do
opts = [
uris: [{"localhost", 9394}],
use_ssl: true,
ssl_options: [verify: :verify_none],
auth:
KafkaEx.Auth.Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{token_provider: fn -> {:ok, "invalid"} end}
})
]

assert {:error, _} = KafkaEx.create_worker(:oauth_bad_worker, opts)
end
end

defp build_unsecured_jwt(subject) do
# "alg":"none" as we have no jwks in tests
header = Base.url_encode64(~s({"alg":"none","typ":"JWT"}), padding: false)
now = System.system_time(:second)
payload = Base.url_encode64(~s({"sub":"#{subject}","iat":#{now},"exp":#{now + 3600}}), padding: false)
"#{header}.#{payload}."
end
end
56 changes: 56 additions & 0 deletions test/kafka_ex/auth/config_oauthbearer_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule KafkaEx.Auth.ConfigOAuthBearerTest do
use ExUnit.Case, async: true

alias KafkaEx.Auth.Config

describe "new/1 with oauthbearer" do
test "builds config with token_provider" do
cfg =
Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{token_provider: fn -> {:ok, "t"} end}
})

assert cfg.mechanism == :oauthbearer
assert is_nil(cfg.username)
assert is_nil(cfg.password)
end

test "accepts extensions" do
cfg =
Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: fn -> {:ok, "t"} end,
extensions: %{"foo" => "bar"}
}
})

assert cfg.mechanism_opts.extensions == %{"foo" => "bar"}
end

test "raises without token_provider" do
assert_raise ArgumentError, ~r/token_provider/, fn ->
Config.new(%{mechanism: :oauthbearer, mechanism_opts: %{}})
end
end

test "raises when token_provider is not a function" do
assert_raise ArgumentError, ~r/token_provider/, fn ->
Config.new(%{mechanism: :oauthbearer, mechanism_opts: %{token_provider: "not-a-fn"}})
end
end

test "raises on reserved 'auth' extension" do
assert_raise ArgumentError, ~r/reserved/, fn ->
Config.new(%{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: fn -> {:ok, "t"} end,
extensions: %{"auth" => "bad"}
}
})
end
end
end
end
Loading
Loading