Skip to content

Commit 1f27c33

Browse files
committed
Oauthbearer SASL implementation
1 parent 22e3e26 commit 1f27c33

File tree

10 files changed

+324
-16
lines changed

10 files changed

+324
-16
lines changed

AUTH.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ KafkaEx supports SASL authentication for secure Kafka clusters. Multiple mechani
77
- **PLAIN** - Simple username/password (requires SSL/TLS)
88
- **SCRAM-SHA-256** - Secure challenge-response authentication (Kafka 0.10.2+)
99
- **SCRAM-SHA-512** - Secure challenge-response with stronger hash (Kafka 0.10.2+)
10+
- **SCRAM-SHA-512** - Secure challenge-response with stronger hash (Kafka 0.10.2+)
11+
- **OAUTHBEARER** - Token-based authentication using OAuth 2.0 bearer tokens (Kafka 2.0+)
1012

1113
## Configuration
1214

@@ -58,6 +60,7 @@ docker-compose up -d
5860
# 9092 - No authentication (SSL)
5961
# 9192 - SASL/PLAIN (SSL)
6062
# 9292 - SASL/SCRAM (SSL)
63+
# 9392 - SASL/Oauthbearer
6164
```
6265

6366
## Security Considerations
@@ -70,6 +73,7 @@ docker-compose up -d
7073

7174
- PLAIN: Kafka 0.9.0+
7275
- SCRAM: Kafka 0.10.2+
76+
- SASL-Oauthbearer: Kafka 2.0+
7377

7478
## Testing with Different Mechanisms
7579

@@ -94,6 +98,20 @@ config :kafka_ex,
9498
}
9599
```
96100

101+
## OAUTHBEARER
102+
```elixir
103+
config :kafka_ex,
104+
brokers: [{"localhost", 9394}],
105+
use_ssl: true,
106+
sasl: %{
107+
mechanism: :oauthbearer,
108+
mechanism_opts: %{
109+
token_provider: &MyOAuth.get_token/0,
110+
extensions: %{"traceId" => "optional"}
111+
}
112+
}
113+
```
114+
97115
## Integration with Existing Code
98116

99117
SASL authentication is transparent to the rest of your KafkaEx usage:

docker-compose-arm.yml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ services:
2323
- "9092:9092" # NOAUTH
2424
- "9192:9192" # PLAIN
2525
- "9292:9292" # SCRAM
26+
- "9392:9392" # OAUTH
2627
environment:
2728
KAFKA_BROKER_ID: "1"
2829

29-
KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292"
30-
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292"
30+
KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292,OAUTH://:9392"
31+
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292,OAUTH://localhost:9392"
3132
healthcheck:
3233
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-1:29092"]
3334
interval: 20s
@@ -46,11 +47,12 @@ services:
4647
- "9093:9093" # NOAUTH
4748
- "9193:9193" # PLAIN
4849
- "9293:9293" # SCRAM
50+
- "9393:9393" # OAUTH
4951
environment:
5052
KAFKA_BROKER_ID: "2"
5153

52-
KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293"
53-
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293"
54+
KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293,OAUTH://:9393"
55+
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293,OAUTH://localhost:9393"
5456
healthcheck:
5557
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-2:29093"]
5658
interval: 20s
@@ -69,11 +71,12 @@ services:
6971
- "9094:9094" # NOAUTH
7072
- "9194:9194" # PLAIN
7173
- "9294:9294" # SCRAM
74+
- "9394:9394" # OAUTH
7275
environment:
7376
KAFKA_BROKER_ID: "3"
7477

75-
KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294"
76-
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294"
78+
KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294,OAUTH://:9394"
79+
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294,OAUTH://localhost:9394"
7780
healthcheck:
7881
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-3:29094"]
7982
interval: 20s

docker-compose-kafka.env

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ KAFKA_SSL_TRUSTSTORE_PASSWORD=kafka_ex
2525
KAFKA_SSL_KEYSTORE_TYPE=PKCS12
2626
KAFKA_SSL_TRUSTSTORE_TYPE=PKCS12
2727

28-
KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
29-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,NOAUTH:SSL,PLAIN:SASL_SSL,SCRAM:SASL_SSL
28+
KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
29+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,NOAUTH:SSL,PLAIN:SASL_SSL,SCRAM:SASL_SSL,OAUTH:SASL_SSL
3030
KAFKA_LISTENER_NAME_plain_SECURITY_PROTOCOL=SASL_SSL
3131
KAFKA_LISTENER_NAME_scram_SECURITY_PROTOCOL=SASL_SSL
3232
KAFKA_LISTENER_NAME_INTERNAL_SECURITY_PROTOCOL=PLAINTEXT
@@ -38,6 +38,10 @@ KAFKA_LISTENER_NAME_scram_SCRAM-SHA-256_SASL_JAAS_CONFIG=org.apache.kafka.common
3838
KAFKA_LISTENER_NAME_scram_SCRAM-SHA-512_SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required;
3939
KAFKA_LISTENER_NAME_plain_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="admin-secret" user_test="secret" user_alice="alice-secret";
4040

41+
KAFKA_LISTENER_NAME_oauth_SASL_ENABLED_MECHANISMS=OAUTHBEARER
42+
KAFKA_LISTENER_NAME_oauth_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="test";
43+
KAFKA_LISTENER_NAME_oauth_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler
44+
4145
######## Config
4246

4347
KAFKA_DELETE_TOPIC_ENABLE=true

docker-compose.yml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ services:
2323
- "9092:9092" # NOAUTH
2424
- "9192:9192" # PLAIN
2525
- "9292:9292" # SCRAM
26+
- "9392:9392" # OAUTH
2627
environment:
2728
KAFKA_BROKER_ID: "1"
2829

29-
KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292"
30-
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292"
30+
KAFKA_LISTENERS: "INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292,OAUTH://:9392"
31+
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292,OAUTH://localhost:9392"
3132
healthcheck:
3233
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-1:29092"]
3334
interval: 20s
@@ -46,11 +47,12 @@ services:
4647
- "9093:9093" # NOAUTH
4748
- "9193:9193" # PLAIN
4849
- "9293:9293" # SCRAM
50+
- "9393:9393" # OAUTH
4951
environment:
5052
KAFKA_BROKER_ID: "2"
5153

52-
KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293"
53-
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293"
54+
KAFKA_LISTENERS: "INTERNAL://:29093,NOAUTH://:9093,PLAIN://:9193,SCRAM://:9293,OAUTH://:9393"
55+
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29093,NOAUTH://localhost:9093,PLAIN://localhost:9193,SCRAM://localhost:9293,OAUTH://localhost:9393"
5456
healthcheck:
5557
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-2:29093"]
5658
interval: 20s
@@ -69,11 +71,12 @@ services:
6971
- "9094:9094" # NOAUTH
7072
- "9194:9194" # PLAIN
7173
- "9294:9294" # SCRAM
74+
- "9394:9394" # OAUTH
7275
environment:
7376
KAFKA_BROKER_ID: "3"
7477

75-
KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294"
76-
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294"
78+
KAFKA_LISTENERS: "INTERNAL://:29094,NOAUTH://:9094,PLAIN://:9194,SCRAM://:9294,OAUTH://:9394"
79+
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29094,NOAUTH://localhost:9094,PLAIN://localhost:9194,SCRAM://localhost:9294,OAUTH://localhost:9394"
7780
healthcheck:
7881
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "kafka-3:29094"]
7982
interval: 20s

lib/kafka_ex/auth/config.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,19 @@ defmodule KafkaEx.Auth.Config do
123123
require_keys!(cfg, [:username, :password])
124124
cfg
125125

126+
:oauthbearer ->
127+
opts = cfg[:mechanism_opts] || %{}
128+
129+
unless is_function(opts[:token_provider], 0) do
130+
raise ArgumentError, "oauthbearer requires mechanism_opts.token_provider/0"
131+
end
132+
133+
if is_map(opts[:extensions]) and Map.has_key?(opts[:extensions], "auth") do
134+
raise ArgumentError, "'auth' is a reserved extension name"
135+
end
136+
137+
Map.merge(%{username: nil, password: nil}, cfg)
138+
126139
_ ->
127140
raise ArgumentError, "Unsupported SASL mechanism: #{inspect(mech)}"
128141
end

lib/kafka_ex/auth/sasl.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ defmodule KafkaEx.Auth.SASL do
4848

4949
@mechanisms %{
5050
plain: KafkaEx.Auth.SASL.Plain,
51-
scram: KafkaEx.Auth.SASL.Scram
52-
# Future extensions: oauthbearer, msk_iam_auth
51+
scram: KafkaEx.Auth.SASL.Scram,
52+
oauthbearer: KafkaEx.Auth.SASL.OAuthBearer
53+
# Future extensions: msk_iam_auth
5354
}
5455

5556
# -------- Public API --------
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
defmodule KafkaEx.Auth.SASL.OAuthBearer do
2+
@moduledoc """
3+
SASL/OAUTHBEARER mechanism for Kafka (KIP-255, KIP-342).
4+
5+
## Configuration
6+
7+
KafkaEx.Auth.Config.new(%{
8+
mechanism: :oauthbearer,
9+
mechanism_opts: %{
10+
token_provider: fn -> {:ok, "jwt-token"} end,
11+
extensions: %{"traceId" => "abc123"} # optional, KIP-342
12+
}
13+
})
14+
15+
## Token Provider
16+
17+
The `token_provider` is a 0-arity function returning:
18+
- `{:ok, token}` - JWT token string
19+
- `{:error, reason}` - failure
20+
21+
The provider is called on each new connection and is responsible for
22+
caching and refreshing tokens.
23+
"""
24+
25+
@behaviour KafkaEx.Auth.Mechanism
26+
27+
alias KafkaEx.Auth.Config
28+
29+
@impl true
30+
def mechanism_name(_config), do: "OAUTHBEARER"
31+
32+
@impl true
33+
def authenticate(%Config{mechanism_opts: opts}, send_fun) do
34+
with {:ok, token} <- fetch_token(opts) do
35+
case send_fun.(build_initial_response(token, opts)) do
36+
{:ok, _} -> :ok
37+
{:error, _} = err -> err
38+
end
39+
end
40+
end
41+
42+
defp fetch_token(%{token_provider: provider}) when is_function(provider, 0) do
43+
case provider.() do
44+
{:ok, token} when is_binary(token) and byte_size(token) > 0 -> {:ok, token}
45+
{:ok, _} -> {:error, :invalid_token}
46+
{:error, _} = err -> err
47+
end
48+
end
49+
50+
defp fetch_token(_), do: {:error, :missing_token_provider}
51+
52+
# RFC 7628 / KIP-255: "n,,\x01auth=Bearer <token>[\x01ext=val...]\x01\x01"
53+
defp build_initial_response(token, opts) do
54+
extensions = Map.get(opts, :extensions, %{})
55+
"n,,\x01auth=Bearer #{token}#{encode_extensions(extensions)}\x01\x01"
56+
end
57+
58+
defp encode_extensions(exts) when map_size(exts) == 0, do: ""
59+
60+
defp encode_extensions(exts) do
61+
Enum.map_join(exts, "", fn {k, v} -> "\x01#{k}=#{v}" end)
62+
end
63+
end
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
defmodule KafkaEx.Integration.OAuthBearerAuthenticationTest do
2+
use ExUnit.Case
3+
4+
@moduletag :integration
5+
@moduletag :oauthbearer
6+
7+
describe "SASL/OAUTHBEARER authentication" do
8+
@tag sasl: :oauthbearer
9+
test "connects and produces with unsecured JWT" do
10+
token = build_unsecured_jwt("test-user")
11+
12+
opts = [
13+
uris: [{"localhost", 9394}],
14+
use_ssl: true,
15+
ssl_options: [verify: :verify_none],
16+
auth:
17+
KafkaEx.Auth.Config.new(%{
18+
mechanism: :oauthbearer,
19+
mechanism_opts: %{token_provider: fn -> {:ok, token} end}
20+
})
21+
]
22+
23+
{:ok, _pid} = KafkaEx.create_worker(:oauth_worker, opts)
24+
25+
assert KafkaEx.produce("test_topic", 0, "oauth_msg", worker_name: :oauth_worker) == :ok
26+
# probably redundant, but whatever
27+
assert %KafkaEx.Protocol.Metadata.Response{} = KafkaEx.metadata(worker_name: :oauth_worker)
28+
end
29+
30+
@tag sasl: :oauthbearer
31+
test "connects with extensions" do
32+
token = build_unsecured_jwt("test-user")
33+
34+
opts = [
35+
uris: [{"localhost", 9394}],
36+
use_ssl: true,
37+
ssl_options: [verify: :verify_none],
38+
auth:
39+
KafkaEx.Auth.Config.new(%{
40+
mechanism: :oauthbearer,
41+
mechanism_opts: %{
42+
token_provider: fn -> {:ok, token} end,
43+
extensions: %{"traceId" => "test-123"}
44+
}
45+
})
46+
]
47+
48+
{:ok, _pid} = KafkaEx.create_worker(:oauth_ext_worker, opts)
49+
# probably redundant, but whatever
50+
assert %KafkaEx.Protocol.Metadata.Response{} = KafkaEx.metadata(worker_name: :oauth_ext_worker)
51+
end
52+
53+
@tag sasl: :oauthbearer
54+
test "fails with invalid token" do
55+
opts = [
56+
uris: [{"localhost", 9394}],
57+
use_ssl: true,
58+
ssl_options: [verify: :verify_none],
59+
auth:
60+
KafkaEx.Auth.Config.new(%{
61+
mechanism: :oauthbearer,
62+
mechanism_opts: %{token_provider: fn -> {:ok, "invalid"} end}
63+
})
64+
]
65+
66+
assert {:error, _} = KafkaEx.create_worker(:oauth_bad_worker, opts)
67+
end
68+
end
69+
70+
defp build_unsecured_jwt(subject) do
71+
# "alg":"none" as we have no jwks in tests
72+
header = Base.url_encode64(~s({"alg":"none","typ":"JWT"}), padding: false)
73+
now = System.system_time(:second)
74+
payload = Base.url_encode64(~s({"sub":"#{subject}","iat":#{now},"exp":#{now + 3600}}), padding: false)
75+
"#{header}.#{payload}."
76+
end
77+
end
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
defmodule KafkaEx.Auth.ConfigOAuthBearerTest do
2+
use ExUnit.Case, async: true
3+
4+
alias KafkaEx.Auth.Config
5+
6+
describe "new/1 with oauthbearer" do
7+
test "builds config with token_provider" do
8+
cfg =
9+
Config.new(%{
10+
mechanism: :oauthbearer,
11+
mechanism_opts: %{token_provider: fn -> {:ok, "t"} end}
12+
})
13+
14+
assert cfg.mechanism == :oauthbearer
15+
assert is_nil(cfg.username)
16+
assert is_nil(cfg.password)
17+
end
18+
19+
test "accepts extensions" do
20+
cfg =
21+
Config.new(%{
22+
mechanism: :oauthbearer,
23+
mechanism_opts: %{
24+
token_provider: fn -> {:ok, "t"} end,
25+
extensions: %{"foo" => "bar"}
26+
}
27+
})
28+
29+
assert cfg.mechanism_opts.extensions == %{"foo" => "bar"}
30+
end
31+
32+
test "raises without token_provider" do
33+
assert_raise ArgumentError, ~r/token_provider/, fn ->
34+
Config.new(%{mechanism: :oauthbearer, mechanism_opts: %{}})
35+
end
36+
end
37+
38+
test "raises when token_provider is not a function" do
39+
assert_raise ArgumentError, ~r/token_provider/, fn ->
40+
Config.new(%{mechanism: :oauthbearer, mechanism_opts: %{token_provider: "not-a-fn"}})
41+
end
42+
end
43+
44+
test "raises on reserved 'auth' extension" do
45+
assert_raise ArgumentError, ~r/reserved/, fn ->
46+
Config.new(%{
47+
mechanism: :oauthbearer,
48+
mechanism_opts: %{
49+
token_provider: fn -> {:ok, "t"} end,
50+
extensions: %{"auth" => "bad"}
51+
}
52+
})
53+
end
54+
end
55+
end
56+
end

0 commit comments

Comments
 (0)