diff --git a/go.mod b/go.mod index 7984924dd..9e8c33118 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( go.opentelemetry.io/otel/trace v1.37.0 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.40.0 + golang.org/x/oauth2 v0.30.0 golang.org/x/sync v0.16.0 golang.org/x/time v0.12.0 google.golang.org/api v0.243.0 @@ -104,7 +105,6 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/oauth2 v0.30.0 // indirect google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect ) diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index ff184a861..81527a764 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -18,8 +18,10 @@ import ( "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/sasl/aws" + "github.com/twmb/franz-go/pkg/sasl/oauth" "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sasl/scram" + "golang.org/x/oauth2/google" ) type KafkaConfig = configtypes.KafkaConsumerConfig @@ -143,6 +145,8 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) { AccessKey: c.config.SASLUser, SecretKey: c.config.SASLPassword, }.AsManagedStreamingIAMMechanism())) + case "google-oauth-bearer-adc": + opts = append(opts, kgo.SASL(oauth.Oauth(fetchGoogleOAuthTokenFromADC))) default: return nil, fmt.Errorf("unsupported SASL mechanism: %s", c.config.SASLMechanism) } @@ -161,6 +165,29 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) { return client, nil } +func fetchGoogleOAuthTokenFromADC(ctx context.Context) (oauth.Auth, error) { + const scope = "https://www.googleapis.com/auth/cloud-platform" + + cred, err := google.FindDefaultCredentials(ctx, scope) + if err != nil { + return oauth.Auth{}, fmt.Errorf("failed to get ADC credentials: %w", err) + } + + tok, err := cred.TokenSource.Token() + if err != nil { + return oauth.Auth{}, fmt.Errorf("failed to retrieve token: %w", err) + } + + if !tok.Valid() { + return oauth.Auth{}, fmt.Errorf("received invalid token") + } + + return oauth.Auth{ + Token: tok.AccessToken, + Extensions: map[string]string{}, + }, nil +} + func (c *KafkaConsumer) leaveGroup(ctx context.Context, client *kgo.Client) error { req := kmsg.NewPtrLeaveGroupRequest() req.Group = c.config.ConsumerGroup