Skip to content

Commit

Permalink
RMPMP-242: Updated WithDeadLetterTopic option to borrow username an…
Browse files Browse the repository at this point in the history
…d password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig (#11)

RMPMP-242: Updated `WithDeadLetterTopic` option to borrow username and
password from ConsumerTopicConfig when those issues aren't specified on
DeadLetterTopicConfig
  • Loading branch information
stewartboyd119 authored Sep 6, 2024
1 parent 658363e commit c0b0f93
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 1 deletion.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.0.2 (Sep 6, 2024)

1. Updated `WithDeadLetterTopic` option to borrow username and password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig

## 1.0.1 (Sep 3, 2024)

1. Added dlt topic name in error logs on dlt write failure
Expand Down
135 changes: 135 additions & 0 deletions test/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,141 @@ func TestWork_WithDeadLetterTopic_MessagesWrittenToDLTSinceErrorOccurred(t *test
require.NoError(t, grp.Wait())
}

func TestWork_WithDeadLetterTopic_DLTWriterConfigCanBorrowFromConsumerConfig(t *testing.T) {
type testCase struct {
name string
inputConfig zkafka.ConsumerTopicConfig
expectedDLTConfig zkafka.ProducerTopicConfig
}

testCases := []testCase{
{
name: "deadletter-username-password-preferred-over-consumer-config",
inputConfig: zkafka.ConsumerTopicConfig{
Topic: topicName,
SaslUsername: ptr("user_orig"),
SaslPassword: ptr("pw_orig"),
DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
SaslUsername: ptr("user"),
SaslPassword: ptr("pw"),
ClientID: uuid.NewString(),
Topic: "topic2",
},
},
expectedDLTConfig: zkafka.ProducerTopicConfig{
SaslUsername: ptr("user"),
SaslPassword: ptr("pw"),
},
},
{
name: "consumer-config-username-password-used-in-abscense-of-dlt-username-password",
inputConfig: zkafka.ConsumerTopicConfig{
Topic: topicName,
SaslUsername: ptr("user_orig"),
SaslPassword: ptr("pw_orig"),
DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
ClientID: uuid.NewString(),
Topic: "topic2",
},
},
expectedDLTConfig: zkafka.ProducerTopicConfig{
SaslUsername: ptr("user_orig"),
SaslPassword: ptr("pw_orig"),
},
},
{
name: "username-password-not-specified-is-okay",
inputConfig: zkafka.ConsumerTopicConfig{
ClientID: uuid.NewString(),
Topic: topicName,
DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
Topic: "topic2",
},
},
expectedDLTConfig: zkafka.ProducerTopicConfig{
SaslUsername: nil,
SaslPassword: nil,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer recoverThenFail(t)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

l := zkafka.NoopLogger{}

mockReader := zkafka_mocks.NewMockReader(ctrl)
msg1 := getRandomMessage()
gomock.InOrder(
mockReader.EXPECT().Read(gomock.Any()).Return(msg1, nil),
mockReader.EXPECT().Read(gomock.Any()).Return(nil, nil).AnyTimes(),
)
mockReader.EXPECT().Close().Return(nil).AnyTimes()

mockWriter := zkafka_mocks.NewMockWriter(ctrl)
// each errored message gets forwarded
mockWriter.EXPECT().WriteRaw(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockWriter.EXPECT().Close().AnyTimes()

mockClientProvider := zkafka_mocks.NewMockClientProvider(ctrl)
mockClientProvider.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(mockReader, nil)
var gotDLTConfig zkafka.ProducerTopicConfig
mockClientProvider.EXPECT().Writer(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, topicConfig zkafka.ProducerTopicConfig, opts ...zkafka.WriterOption) (zkafka.Writer, error) {
gotDLTConfig = topicConfig
return mockWriter, nil
})

kwf := zkafka.NewWorkFactory(mockClientProvider, zkafka.WithLogger(l))

processor := fakeProcessor{
process: func(ctx context.Context, message *zkafka.Message) error {
return errors.New("processor error")
},
}

cfg := tc.inputConfig
w1 := kwf.Create(
cfg,
&processor,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grp := errgroup.Group{}
grp.Go(func() error {
return w1.Run(ctx, nil)
})

pollWait(func() bool {
return len(processor.ProcessedMessages()) >= 1
}, pollOpts{
timeoutExit: func() {
require.Fail(t, "Timed out during poll")
},
pollPause: time.Millisecond,
maxWait: 10 * time.Second,
})
cancel()
require.NoError(t, grp.Wait())
if tc.expectedDLTConfig.SaslUsername == nil {
require.Nil(t, gotDLTConfig.SaslUsername)
} else {
require.NotNil(t, gotDLTConfig.SaslUsername)
require.Equal(t, *tc.expectedDLTConfig.SaslUsername, *gotDLTConfig.SaslUsername)
}
if tc.expectedDLTConfig.SaslPassword == nil {
require.Nil(t, gotDLTConfig.SaslPassword)
} else {
require.NotNil(t, gotDLTConfig.SaslPassword)
require.Equal(t, *tc.expectedDLTConfig.SaslPassword, *gotDLTConfig.SaslPassword)
}
})
}
}

// TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing shows that even if get topic writer (for DLT) returns error processing still continues.
// This test configures a single virtual partition to process the reader. If processing halted on account of DLT write error,
// the test wouldn't get through all 10 messages
Expand Down
11 changes: 10 additions & 1 deletion workoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,18 @@ func (d dltOption) apply(w *Work) {
return
}

// if not specified explicitly in dlt config, use username/pw from consumerTopicConfig
dltConfig := d.dltConfig
if dltConfig.SaslUsername == nil || *d.dltConfig.SaslUsername == "" {
dltConfig.SaslUsername = w.topicConfig.SaslUsername
}

if dltConfig.SaslPassword == nil || *d.dltConfig.SaslPassword == "" {
dltConfig.SaslPassword = w.topicConfig.SaslPassword
}
// even if we're going to skip forwarding a message to the DLT (because there was no error),
// establish a writer to the DLT early, so when the time comes the write is fast
writer, err := w.kafkaProvider.Writer(ctx, d.dltConfig)
writer, err := w.kafkaProvider.Writer(ctx, dltConfig)
if err != nil {
w.logger.Errorw(ctx, "Failed to get writer for dlt", "error", err, "offset", message.Offset, "partition", message.Partition, "source_topic", message.Topic, "dlt_topic", d.dltConfig.Topic)
return
Expand Down

0 comments on commit c0b0f93

Please sign in to comment.