Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/3680.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changelog entries for the 2 datasources are missing

resource/mongodbatlas_stream_connection: Add new authentication mechanism(OIDC) to the Kafka connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resource/mongodbatlas_stream_connection: Add new authentication mechanism(OIDC) to the Kafka connection
resource/mongodbatlas_stream_connection: Adds new authentication mechanism(OIDC) to the Kafka connection

3rd person for changelog messages

```
8 changes: 7 additions & 1 deletion docs/data-sources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, `SCRAM-512`, or `OAUTHBEARER`.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer(IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do all new attributes apply to Kafka? in that case consider having them inside a kafka_config nested attribute, although it's probably better to follow as it's in the Atlas API

* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Kafka clients use this to specify the scope of the access request to the broker.
* `sasl_oauthbearer_extensions` - Additional information to be provided to the Kafka broker.
* `https_ca_pem` - The CA certificates as a PEM string.
Copy link
Collaborator

@jwilliams-mongo jwilliams-mongo Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think it would be helpful to note these should be concatenated? Any specific certs in the chain that are not required? is a specific order required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "be concatenated"? No specific order is required. We don't want to mark optional on the fields because the OpenID protocol is very flexible. We can't guarantee whether field is a must or not. For backward compatibility, we don't indicate whether it is mandatory or not.


### Security

Expand Down
8 changes: 7 additions & 1 deletion docs/data-sources/stream_connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, `SCRAM-512`, or `OAUTHBEARER`.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer(IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client. It must be unique across all clients that the authorization server handles.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Kafka clients use this to specify the scope of the access request to the broker.
* `sasl_oauthbearer_extensions` - Additional information to be provided to the Kafka broker.
* `https_ca_pem` - The CA certificates as a PEM string.

### Security

Expand Down
32 changes: 32 additions & 0 deletions docs/resources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,38 @@ resource "mongodbatlas_stream_connection" "test" {
}
```

### Example Kafka SASL OAuthbearer Connection
Copy link
Member

@lantoli lantoli Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new attributes description is missing in resource doc page


```terraform
resource "mongodbatlas_stream_connection" "example-kafka-oauthbearer" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = "KafkaOAuthbearerConnection"
type = "Kafka"
authentication = {
mechanism = "OAUTHBEARER"
token_endpoint_url = "https://example.com/oauth/token"
client_id = "auth0Client"
client_secret = var.kafka_client_secret
scope = "read:messages write:messages"
sasl_oauthbearer_extensions = "logicalCluster=lkc-kmom,identityPoolId=pool-lAr"
https_ca_pem = "pemtext"
}
bootstrap_servers = "localhost:9092,localhost:9092"
config = {
"auto.offset.reset" : "earliest"
}
security = {
protocol = "SASL_PLAINTEXT"
}
networking = {
access = {
type = "PUBLIC"
}
}
}
```

### Example Kafka SASL SSL Connection

```terraform
Expand Down
28 changes: 28 additions & 0 deletions examples/mongodbatlas_stream_connection/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ resource "mongodbatlas_stream_connection" "example-kafka-plaintext" {
}
}

resource "mongodbatlas_stream_connection" "example-kafka-oauthbearer" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = "KafkaOAuthbearerConnection"
type = "Kafka"
authentication = {
mechanism = "OAUTHBEARER"
token_endpoint_url = "https://example.com/oauth/token"
client_id = "auth0Client"
client_secret = var.kafka_client_secret
scope = "read:messages write:messages"
sasl_oauthbearer_extensions = "logicalCluster=lkc-kmom,identityPoolId=pool-lAr"
https_ca_pem = "pemtext"
}
bootstrap_servers = "localhost:9092,localhost:9092"
config = {
"auto.offset.reset" : "earliest"
}
security = {
protocol = "SASL_PLAINTEXT"
}
networking = {
access = {
type = "PUBLIC"
}
}
}

resource "mongodbatlas_stream_connection" "example-kafka-ssl" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
Expand Down
5 changes: 5 additions & 0 deletions examples/mongodbatlas_stream_connection/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ variable "kafka_password" {
type = string
}

variable "kafka_client_secret" {
description = "Secret known only to the Kafka client and the authorization server"
type = string
}

variable "kafka_ssl_cert" {
description = "Public certificate used for SASL_SSL configuration to connect to your Kafka cluster"
type = string
Expand Down
22 changes: 17 additions & 5 deletions internal/service/streamconnection/model_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ func NewStreamConnectionReq(ctx context.Context, plan *TFStreamConnectionModel)
return nil, diags
}
streamConnection.Authentication = &admin.StreamsKafkaAuthentication{
Mechanism: authenticationModel.Mechanism.ValueStringPointer(),
Password: authenticationModel.Password.ValueStringPointer(),
Username: authenticationModel.Username.ValueStringPointer(),
Mechanism: authenticationModel.Mechanism.ValueStringPointer(),
Password: authenticationModel.Password.ValueStringPointer(),
Username: authenticationModel.Username.ValueStringPointer(),
TokenEndpointUrl: authenticationModel.TokenEndpointURL.ValueStringPointer(),
ClientId: authenticationModel.ClientID.ValueStringPointer(),
ClientSecret: authenticationModel.ClientSecret.ValueStringPointer(),
Scope: authenticationModel.Scope.ValueStringPointer(),
SaslOauthbearerExtensions: authenticationModel.SaslOauthbearerExtensions.ValueStringPointer(),
HttpsCaPem: authenticationModel.HTTPSCaPem.ValueStringPointer(),
}
}
if !plan.Security.IsNull() {
Expand Down Expand Up @@ -215,8 +221,13 @@ func NewTFStreamConnection(ctx context.Context, projID, instanceName string, cur
func newTFConnectionAuthenticationModel(ctx context.Context, currAuthConfig *types.Object, authResp *admin.StreamsKafkaAuthentication) (*types.Object, diag.Diagnostics) {
if authResp != nil {
resultAuthModel := TFConnectionAuthenticationModel{
Mechanism: types.StringPointerValue(authResp.Mechanism),
Username: types.StringPointerValue(authResp.Username),
Mechanism: types.StringPointerValue(authResp.Mechanism),
Username: types.StringPointerValue(authResp.Username),
TokenEndpointURL: types.StringPointerValue(authResp.TokenEndpointUrl),
ClientID: types.StringPointerValue(authResp.ClientId),
Scope: types.StringPointerValue(authResp.Scope),
SaslOauthbearerExtensions: types.StringPointerValue(authResp.SaslOauthbearerExtensions),
HTTPSCaPem: types.StringPointerValue(authResp.HttpsCaPem),
}

if currAuthConfig != nil && !currAuthConfig.IsNull() { // if config is available (create & update of resource) password value is set in new state
Expand All @@ -225,6 +236,7 @@ func newTFConnectionAuthenticationModel(ctx context.Context, currAuthConfig *typ
return nil, diags
}
resultAuthModel.Password = configAuthModel.Password
resultAuthModel.ClientSecret = configAuthModel.ClientSecret
}

resultObject, diags := types.ObjectValueFrom(ctx, ConnectionAuthenticationObjectType.AttrTypes, resultAuthModel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ import (
)

const (
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authUsername = "user1"
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authMechanismOAuth = "OAUTHBEARER"
authUsername = "user1"
clientID = "auth0Client"
clientSecret = "secret"
// #nosec G101
tokenEndpointURL = "https://your-domain.com/oauth2/token"
scope = "read:messages write:messages"
saslOauthbearerExtentions = "logicalCluster=cluster-kmo17m,identityPoolId=pool-l7Arl"
httpsCaPem = "MHWER3343"
securityProtocol = "SASL_SSL"
bootstrapServers = "localhost:9092,another.host:9092"
dbRole = "customRole"
Expand Down Expand Up @@ -50,6 +58,7 @@ type sdkToTFModelTestCase struct {

func TestStreamConnectionSDKToTFModel(t *testing.T) {
var authConfigWithPasswordDefined = tfAuthenticationObject(t, authMechanism, authUsername, "raw password")
var authConfigWithOAuth = tfAuthenticationObjectForOAuth(t, authMechanismOAuth, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtentions, httpsCaPem)

testCases := []sdkToTFModelTestCase{
{
Expand Down Expand Up @@ -146,6 +155,44 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
Headers: types.MapNull(types.StringType),
},
},
{
name: "Kafka connection type SDK response for OAuthBearer authentication",
SDKResp: &admin.StreamsConnection{
Name: admin.PtrString(connectionName),
Type: admin.PtrString("Kafka"),
Authentication: &admin.StreamsKafkaAuthentication{
Mechanism: admin.PtrString(authMechanismOAuth),
ClientId: admin.PtrString(clientID),
TokenEndpointUrl: admin.PtrString(tokenEndpointURL),
Scope: admin.PtrString(scope),
SaslOauthbearerExtensions: admin.PtrString(saslOauthbearerExtentions),
HttpsCaPem: admin.PtrString(httpsCaPem),
},
BootstrapServers: admin.PtrString(bootstrapServers),
Config: &configMap,
Security: &admin.StreamsKafkaSecurity{
Protocol: admin.PtrString(securityProtocol),
BrokerPublicCertificate: admin.PtrString(DummyCACert),
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: &authConfigWithOAuth,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
ConnectionName: types.StringValue(connectionName),
Type: types.StringValue("Kafka"),
Authentication: tfAuthenticationObjectForOAuth(t, authMechanismOAuth, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtentions, httpsCaPem), // password value is obtained from config, not api resp.
BootstrapServers: types.StringValue(bootstrapServers),
Config: tfConfigMap(t, configMap),
Security: tfSecurityObject(t, DummyCACert, securityProtocol),
DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes),
Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes),
AWS: types.ObjectNull(streamconnection.AWSObjectType.AttrTypes),
Headers: types.MapNull(types.StringType),
},
},
{
name: "Kafka connection type SDK response with no optional values provided",
SDKResp: &admin.StreamsConnection{
Expand Down Expand Up @@ -596,6 +643,23 @@ func tfAuthenticationObject(t *testing.T, mechanism, username, password string)
return auth
}

func tfAuthenticationObjectForOAuth(t *testing.T, mechanism, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtensions, httpsCaPem string) types.Object {
t.Helper()
auth, diags := types.ObjectValueFrom(t.Context(), streamconnection.ConnectionAuthenticationObjectType.AttrTypes, streamconnection.TFConnectionAuthenticationModel{
Mechanism: types.StringValue(mechanism),
ClientID: types.StringValue(clientID),
ClientSecret: types.StringValue(clientSecret),
TokenEndpointURL: types.StringValue(tokenEndpointURL),
Scope: types.StringValue(scope),
SaslOauthbearerExtensions: types.StringValue(saslOauthbearerExtensions),
HTTPSCaPem: types.StringValue(httpsCaPem),
})
if diags.HasError() {
t.Errorf("failed to create terraform data model: %s", diags.Errors()[0].Summary())
}
return auth
}

func tfAuthenticationObjectWithNoPassword(t *testing.T, mechanism, username string) types.Object {
t.Helper()
auth, diags := types.ObjectValueFrom(t.Context(), streamconnection.ConnectionAuthenticationObjectType.AttrTypes, streamconnection.TFConnectionAuthenticationModel{
Expand Down
19 changes: 19 additions & 0 deletions internal/service/streamconnection/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,25 @@ func ResourceSchema(ctx context.Context) schema.Schema {
"username": schema.StringAttribute{
Optional: true,
},
"token_endpoint_url": schema.StringAttribute{
Optional: true,
},
"client_id": schema.StringAttribute{
Optional: true,
},
"client_secret": schema.StringAttribute{
Optional: true,
Sensitive: true,
},
"scope": schema.StringAttribute{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: does Atlas return some default value if not provided by the client? in that case the atribute might need to be Optional & Computed, although better to keep it as Optional if possible

Optional: true,
},
"sasl_oauthbearer_extensions": schema.StringAttribute{
Optional: true,
},
"https_ca_pem": schema.StringAttribute{
Optional: true,
},
},
},
"bootstrap_servers": schema.StringAttribute{
Expand Down
24 changes: 18 additions & 6 deletions internal/service/streamconnection/resource_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@ type TFStreamConnectionModel struct {
}

type TFConnectionAuthenticationModel struct {
Mechanism types.String `tfsdk:"mechanism"`
Password types.String `tfsdk:"password"`
Username types.String `tfsdk:"username"`
Mechanism types.String `tfsdk:"mechanism"`
Password types.String `tfsdk:"password"`
Username types.String `tfsdk:"username"`
TokenEndpointURL types.String `tfsdk:"token_endpoint_url"`
ClientID types.String `tfsdk:"client_id"`
ClientSecret types.String `tfsdk:"client_secret"`
Scope types.String `tfsdk:"scope"`
SaslOauthbearerExtensions types.String `tfsdk:"sasl_oauthbearer_extensions"`
HTTPSCaPem types.String `tfsdk:"https_ca_pem"`
}

var ConnectionAuthenticationObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{
"mechanism": types.StringType,
"password": types.StringType,
"username": types.StringType,
"mechanism": types.StringType,
"password": types.StringType,
"username": types.StringType,
"token_endpoint_url": types.StringType,
"client_id": types.StringType,
"client_secret": types.StringType,
"scope": types.StringType,
"sasl_oauthbearer_extensions": types.StringType,
"https_ca_pem": types.StringType,
}}

type TFConnectionSecurityModel struct {
Expand Down
Loading
Loading