Skip to content

Commit

Permalink
Log an error if the Kafka partitions auto-creation fail with an error…
Browse files Browse the repository at this point in the history
… in the Kafka response (#9884)

* Log an error if the Kafka partitions auto-creation fail with an error in the Kafka response

Signed-off-by: Marco Pracucci <[email protected]>

* Fix linter

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Nov 13, 2024
1 parent cc5f5cc commit 91e0980
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 21 deletions.
12 changes: 11 additions & 1 deletion pkg/storage/ingest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,24 @@ func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg KafkaConfig, logger lo
defer adm.Close()

defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
_, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
responses, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
{
Op: kadm.SetConfig,
Name: "num.partitions",
Value: &defaultNumberOfPartitions,
},
})

// Check if any error has been returned as part of the response.
if err == nil {
for _, res := range responses {
if res.Err != nil {
err = res.Err
break
}
}
}

if err != nil {
level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
return
Expand Down
111 changes: 91 additions & 20 deletions pkg/storage/ingest/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kmsg"
)
Expand Down Expand Up @@ -129,32 +131,101 @@ func TestResultPromise(t *testing.T) {
}

func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) {
cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
require.NoError(t, err)
t.Cleanup(cluster.Close)
createKafkaCluster := func(t *testing.T) (string, *kfake.Cluster) {
cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
require.NoError(t, err)
t.Cleanup(cluster.Close)

addrs := cluster.ListenAddrs()
require.Len(t, addrs, 1)
addrs := cluster.ListenAddrs()
require.Len(t, addrs, 1)

cfg := KafkaConfig{
Address: addrs[0],
AutoCreateTopicDefaultPartitions: 100,
return addrs[0], cluster
}

cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
r := request.(*kmsg.AlterConfigsRequest)
t.Run("should create the partitions", func(t *testing.T) {
var (
addr, cluster = createKafkaCluster(t)
cfg = KafkaConfig{
Address: addr,
AutoCreateTopicDefaultPartitions: 100,
}
)

cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
r := request.(*kmsg.AlterConfigsRequest)

require.Len(t, r.Resources, 1)
res := r.Resources[0]
require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
require.Len(t, res.Configs, 1)
cfg := res.Configs[0]
require.Equal(t, "num.partitions", cfg.Name)
require.NotNil(t, *cfg.Value)
require.Equal(t, "100", *cfg.Value)

return &kmsg.AlterConfigsResponse{
Version: r.Version,
}, nil, true
})

logs := concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(&logs)

setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, logger)
require.NotContains(t, logs.String(), "err")
})

t.Run("should log an error if the request fails", func(t *testing.T) {
var (
addr, cluster = createKafkaCluster(t)
cfg = KafkaConfig{
Address: addr,
AutoCreateTopicDefaultPartitions: 100,
}
)

cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(_ kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.AlterConfigsResponse{}, errors.New("failed request"), true
})

logs := concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(&logs)

require.Len(t, r.Resources, 1)
res := r.Resources[0]
require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
require.Len(t, res.Configs, 1)
cfg := res.Configs[0]
require.Equal(t, "num.partitions", cfg.Name)
require.NotNil(t, *cfg.Value)
require.Equal(t, "100", *cfg.Value)
setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, logger)
require.Contains(t, logs.String(), "err")
})

t.Run("should log an error if the request succeed but the response contains an error", func(t *testing.T) {
var (
addr, cluster = createKafkaCluster(t)
cfg = KafkaConfig{
Address: addr,
AutoCreateTopicDefaultPartitions: 100,
}
)

return &kmsg.AlterConfigsResponse{}, nil, true
cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(kReq kmsg.Request) (kmsg.Response, error, bool) {
req := kReq.(*kmsg.AlterConfigsRequest)
require.Len(t, req.Resources, 1)

return &kmsg.AlterConfigsResponse{
Version: req.Version,
Resources: []kmsg.AlterConfigsResponseResource{
{
ResourceType: req.Resources[0].ResourceType,
ResourceName: req.Resources[0].ResourceName,
ErrorCode: kerr.InvalidRequest.Code,
ErrorMessage: pointerOf(kerr.InvalidRequest.Message),
},
},
}, nil, true
})

logs := concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(&logs)

setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, logger)
require.Contains(t, logs.String(), "err")
})

setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, log.NewNopLogger())
}

0 comments on commit 91e0980

Please sign in to comment.