Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/modules/components/pages/inputs/postgres_cdc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ input:
aws:
enabled: false
region: "" # No default (optional)
endpoint: "" # No default (required)
endpoint: ""
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
Expand Down Expand Up @@ -484,11 +484,12 @@ The AWS region where the PostgreSQL instance is located. If no region is specifi

=== `aws.endpoint`

The PostgreSQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com).
The PostgreSQL endpoint as `host:port` (for example `mydb.abc123.us-east-1.rds.amazonaws.com:5432`). The AWS IAM token signer requires the port; if it is omitted the port from the DSN is used, falling back to `5432`. When this field is empty the address from the DSN is used.


*Type*: `string`

*Default*: `""`

=== `aws.id`

Expand Down
11 changes: 3 additions & 8 deletions internal/impl/aws/s3/bench/read/redpanda-connect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ func seedBucket(ctx context.Context, client *s3.Client, total, objSize, numWorke
errCh := make(chan error, 1)

for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for n := range jobs {
if err := uploadObject(ctx, client, n, objSize); err != nil {
select {
Expand All @@ -154,7 +152,7 @@ func seedBucket(ctx context.Context, client *s3.Client, total, objSize, numWorke
fmt.Printf("Progress: %d/%d objects (%.0f obj/sec)\n", done, total, float64(done)/elapsed)
}
}
}()
})
}

for i := range total {
Expand Down Expand Up @@ -196,10 +194,7 @@ func uploadObject(ctx context.Context, client *s3.Client, n, size int) error {
func makeObjectBody(n, size int) []byte {
prefix := fmt.Sprintf(`{"id":"obj-%d","created_at":"%s","data":"`, n, time.Now().UTC().Format(time.RFC3339))
suffix := `"}`
pad := size - len(prefix) - len(suffix)
if pad < 0 {
pad = 0
}
pad := max(size-len(prefix)-len(suffix), 0)
return []byte(prefix + strings.Repeat("x", pad) + suffix)
}

Expand Down
36 changes: 36 additions & 0 deletions internal/impl/postgresql/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ package aws

import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -31,6 +35,8 @@ import (
pgstream "github.com/redpanda-data/connect/v4/internal/impl/postgresql"
)

const defaultPostgresPort = "5432"

type roleConfig struct {
arn string
externalID string
Expand All @@ -57,6 +63,9 @@ func awsIAMAuth(ctx context.Context, awsConf *service.ParsedConfig, dbConf *pgco
if endpoint, err = awsConf.FieldString("endpoint"); err != nil {
return nil, err
}
if endpoint, err = normalizeIAMEndpoint(endpoint, dbConf.Host, dbConf.Port); err != nil {
return nil, err
}
if region, _ = awsConf.FieldString("region"); region != "" {
opts = append(opts, awsconfig.WithRegion(region))
}
Expand Down Expand Up @@ -149,6 +158,33 @@ func assumeRoleChain(ctx context.Context, awsCfg aws.Config, roles []roleConfig,
return currentConfig, nil
}

// normalizeIAMEndpoint returns a `host:port` value suitable for
// auth.BuildAuthToken. The AWS SDK rejects an endpoint without a port, so this
// helper fills one in from the parsed connection config: if `endpoint` is
// empty the DSN host/port pair is used; if `endpoint` is a bare hostname the
// DSN's port is appended, with the well-known PostgreSQL port as a final
// fallback.
func normalizeIAMEndpoint(endpoint, dsnHost string, dsnPort uint16) (string, error) {
if endpoint == "" {
if dsnHost == "" {
return "", errors.New("aws IAM authentication requires an endpoint and the DSN does not contain a host")
}
port := defaultPostgresPort
if dsnPort != 0 {
port = strconv.Itoa(int(dsnPort))
}
return net.JoinHostPort(dsnHost, port), nil
}
if strings.Contains(endpoint, ":") {
return endpoint, nil
}
port := defaultPostgresPort
if dsnPort != 0 {
port = strconv.Itoa(int(dsnPort))
}
return net.JoinHostPort(endpoint, port), nil
}

func parseRoleConfig(awsConf *service.ParsedConfig) ([]roleConfig, error) {
var roles []roleConfig
if role, err := awsConf.FieldString("role"); err != nil {
Expand Down
87 changes: 87 additions & 0 deletions internal/impl/postgresql/aws/aws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2025 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aws

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNormalizeIAMEndpoint(t *testing.T) {
cases := []struct {
name string
endpoint string
dsnHost string
dsnPort uint16
want string
wantErr bool
}{
{
name: "endpoint already has port",
endpoint: "mydb.rds.amazonaws.com:5432",
dsnHost: "mydb.rds.amazonaws.com",
dsnPort: 5432,
want: "mydb.rds.amazonaws.com:5432",
},
{
name: "endpoint missing port, derive from DSN",
endpoint: "mydb.rds.amazonaws.com",
dsnHost: "mydb.rds.amazonaws.com",
dsnPort: 5433,
want: "mydb.rds.amazonaws.com:5433",
},
{
name: "endpoint missing port, DSN port zero, fall back to 5432",
endpoint: "mydb.rds.amazonaws.com",
dsnHost: "mydb.rds.amazonaws.com",
dsnPort: 0,
want: "mydb.rds.amazonaws.com:5432",
},
{
name: "endpoint empty, fall back to DSN host:port",
endpoint: "",
dsnHost: "mydb.rds.amazonaws.com",
dsnPort: 5432,
want: "mydb.rds.amazonaws.com:5432",
},
{
name: "endpoint empty, DSN port zero, append default port",
endpoint: "",
dsnHost: "mydb.rds.amazonaws.com",
dsnPort: 0,
want: "mydb.rds.amazonaws.com:5432",
},
{
name: "endpoint empty and DSN host empty errors",
endpoint: "",
dsnHost: "",
dsnPort: 5432,
wantErr: true,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, err := normalizeIAMEndpoint(tc.endpoint, tc.dsnHost, tc.dsnPort)
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}
3 changes: 2 additions & 1 deletion internal/impl/postgresql/input_pg_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ This connector uses the naming pattern ` + "`pglog_stream_<replication_slot_name
Description("The AWS region where the PostgreSQL instance is located. If no region is specified then the environment default will be used.").
Optional(),
service.NewStringField("endpoint").
Description("The PostgreSQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com)."),
Description("The PostgreSQL endpoint as `host:port` (for example `mydb.abc123.us-east-1.rds.amazonaws.com:5432`). The AWS IAM token signer requires the port; if it is omitted the port from the DSN is used, falling back to `5432`. When this field is empty the address from the DSN is used.").
Default(""),
service.NewStringField("id").
Description("The ID of credentials to use.").
Optional().Advanced(),
Expand Down
Loading