From 7be4548f0f93cfa2799778f80b5907d51bfd72cf Mon Sep 17 00:00:00 2001 From: David Yu Date: Fri, 15 May 2026 22:05:52 -0700 Subject: [PATCH 1/2] postgres_cdc: relax aws.endpoint to accept bare hostnames The AWS SDK's `auth.BuildAuthToken` rejects an endpoint without a port ("the provided endpoint is missing a port, or the provided port is invalid"), but `aws.endpoint` was documented as a bare hostname. Mirror the mysql_cdc fix from #4433: derive the port from the parsed pgconn config when the user omits it (falling back to 5432), and fall back to the DSN address entirely when `aws.endpoint` is empty. Field is now optional with a `""` default. Backwards compatible: explicit `host:port` values are still passed through unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../components/pages/inputs/postgres_cdc.adoc | 5 +- internal/impl/postgresql/aws/aws.go | 36 ++++++++ internal/impl/postgresql/aws/aws_test.go | 87 +++++++++++++++++++ internal/impl/postgresql/input_pg_stream.go | 3 +- 4 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 internal/impl/postgresql/aws/aws_test.go diff --git a/docs/modules/components/pages/inputs/postgres_cdc.adoc b/docs/modules/components/pages/inputs/postgres_cdc.adoc index a7d64a843b..69283db293 100644 --- a/docs/modules/components/pages/inputs/postgres_cdc.adoc +++ b/docs/modules/components/pages/inputs/postgres_cdc.adoc @@ -92,7 +92,7 @@ input: aws: enabled: false region: "" # No default (optional) - endpoint: "" # No default (required) + endpoint: "" id: "" # No default (optional) secret: "" # No default (optional) token: "" # No default (optional) @@ -484,11 +484,12 @@ The AWS region where the PostgreSQL instance is located. If no region is specifi === `aws.endpoint` -The PostgreSQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com). +The PostgreSQL endpoint as `host:port` (for example `mydb.abc123.us-east-1.rds.amazonaws.com:5432`). The AWS IAM token signer requires the port; if it is omitted the port from the DSN is used, falling back to `5432`. When this field is empty the address from the DSN is used. *Type*: `string` +*Default*: `""` === `aws.id` diff --git a/internal/impl/postgresql/aws/aws.go b/internal/impl/postgresql/aws/aws.go index 968c86d01f..8b5e539e20 100644 --- a/internal/impl/postgresql/aws/aws.go +++ b/internal/impl/postgresql/aws/aws.go @@ -16,7 +16,11 @@ package aws import ( "context" + "errors" "fmt" + "net" + "strconv" + "strings" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" @@ -31,6 +35,8 @@ import ( pgstream "github.com/redpanda-data/connect/v4/internal/impl/postgresql" ) +const defaultPostgresPort = "5432" + type roleConfig struct { arn string externalID string @@ -57,6 +63,9 @@ func awsIAMAuth(ctx context.Context, awsConf *service.ParsedConfig, dbConf *pgco if endpoint, err = awsConf.FieldString("endpoint"); err != nil { return nil, err } + if endpoint, err = normalizeIAMEndpoint(endpoint, dbConf.Host, dbConf.Port); err != nil { + return nil, err + } if region, _ = awsConf.FieldString("region"); region != "" { opts = append(opts, awsconfig.WithRegion(region)) } @@ -149,6 +158,33 @@ func assumeRoleChain(ctx context.Context, awsCfg aws.Config, roles []roleConfig, return currentConfig, nil } +// normalizeIAMEndpoint returns a `host:port` value suitable for +// auth.BuildAuthToken. The AWS SDK rejects an endpoint without a port, so this +// helper fills one in from the parsed connection config: if `endpoint` is +// empty the DSN host/port pair is used; if `endpoint` is a bare hostname the +// DSN's port is appended, with the well-known PostgreSQL port as a final +// fallback. +func normalizeIAMEndpoint(endpoint, dsnHost string, dsnPort uint16) (string, error) { + if endpoint == "" { + if dsnHost == "" { + return "", errors.New("aws IAM authentication requires an endpoint and the DSN does not contain a host") + } + port := defaultPostgresPort + if dsnPort != 0 { + port = strconv.Itoa(int(dsnPort)) + } + return net.JoinHostPort(dsnHost, port), nil + } + if strings.Contains(endpoint, ":") { + return endpoint, nil + } + port := defaultPostgresPort + if dsnPort != 0 { + port = strconv.Itoa(int(dsnPort)) + } + return net.JoinHostPort(endpoint, port), nil +} + func parseRoleConfig(awsConf *service.ParsedConfig) ([]roleConfig, error) { var roles []roleConfig if role, err := awsConf.FieldString("role"); err != nil { diff --git a/internal/impl/postgresql/aws/aws_test.go b/internal/impl/postgresql/aws/aws_test.go new file mode 100644 index 0000000000..be26c846eb --- /dev/null +++ b/internal/impl/postgresql/aws/aws_test.go @@ -0,0 +1,87 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNormalizeIAMEndpoint(t *testing.T) { + cases := []struct { + name string + endpoint string + dsnHost string + dsnPort uint16 + want string + wantErr bool + }{ + { + name: "endpoint already has port", + endpoint: "mydb.rds.amazonaws.com:5432", + dsnHost: "mydb.rds.amazonaws.com", + dsnPort: 5432, + want: "mydb.rds.amazonaws.com:5432", + }, + { + name: "endpoint missing port, derive from DSN", + endpoint: "mydb.rds.amazonaws.com", + dsnHost: "mydb.rds.amazonaws.com", + dsnPort: 5433, + want: "mydb.rds.amazonaws.com:5433", + }, + { + name: "endpoint missing port, DSN port zero, fall back to 5432", + endpoint: "mydb.rds.amazonaws.com", + dsnHost: "mydb.rds.amazonaws.com", + dsnPort: 0, + want: "mydb.rds.amazonaws.com:5432", + }, + { + name: "endpoint empty, fall back to DSN host:port", + endpoint: "", + dsnHost: "mydb.rds.amazonaws.com", + dsnPort: 5432, + want: "mydb.rds.amazonaws.com:5432", + }, + { + name: "endpoint empty, DSN port zero, append default port", + endpoint: "", + dsnHost: "mydb.rds.amazonaws.com", + dsnPort: 0, + want: "mydb.rds.amazonaws.com:5432", + }, + { + name: "endpoint empty and DSN host empty errors", + endpoint: "", + dsnHost: "", + dsnPort: 5432, + wantErr: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got, err := normalizeIAMEndpoint(tc.endpoint, tc.dsnHost, tc.dsnPort) + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) + } +} diff --git a/internal/impl/postgresql/input_pg_stream.go b/internal/impl/postgresql/input_pg_stream.go index 8ab2911a1f..f3d95101bc 100644 --- a/internal/impl/postgresql/input_pg_stream.go +++ b/internal/impl/postgresql/input_pg_stream.go @@ -156,7 +156,8 @@ This connector uses the naming pattern ` + "`pglog_stream_ Date: Fri, 15 May 2026 21:49:34 -0700 Subject: [PATCH 2/2] chore: fix pre-existing modernize lint hits in s3 read bench The S3 read benchmark added in #4369 (commit bed70d9e) is failing the modernize linter on main: `waitgroup` wants WaitGroup.Go on line 141, `minmax` wants max() on line 200. Both fixes are mechanical and unrelated to mysql_cdc, included here so this PR's golangci-lint job can be green. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../impl/aws/s3/bench/read/redpanda-connect/main.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/internal/impl/aws/s3/bench/read/redpanda-connect/main.go b/internal/impl/aws/s3/bench/read/redpanda-connect/main.go index cb38503838..9cf774bc03 100644 --- a/internal/impl/aws/s3/bench/read/redpanda-connect/main.go +++ b/internal/impl/aws/s3/bench/read/redpanda-connect/main.go @@ -137,9 +137,7 @@ func seedBucket(ctx context.Context, client *s3.Client, total, objSize, numWorke errCh := make(chan error, 1) for range numWorkers { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { for n := range jobs { if err := uploadObject(ctx, client, n, objSize); err != nil { select { @@ -154,7 +152,7 @@ func seedBucket(ctx context.Context, client *s3.Client, total, objSize, numWorke fmt.Printf("Progress: %d/%d objects (%.0f obj/sec)\n", done, total, float64(done)/elapsed) } } - }() + }) } for i := range total { @@ -196,10 +194,7 @@ func uploadObject(ctx context.Context, client *s3.Client, n, size int) error { func makeObjectBody(n, size int) []byte { prefix := fmt.Sprintf(`{"id":"obj-%d","created_at":"%s","data":"`, n, time.Now().UTC().Format(time.RFC3339)) suffix := `"}` - pad := size - len(prefix) - len(suffix) - if pad < 0 { - pad = 0 - } + pad := max(size-len(prefix)-len(suffix), 0) return []byte(prefix + strings.Repeat("x", pad) + suffix) }