From c0b0f93656f90e46d76c2883618681b755f9cb19 Mon Sep 17 00:00:00 2001 From: Stewart Boyd Date: Fri, 6 Sep 2024 10:45:55 -0700 Subject: [PATCH] RMPMP-242: Updated `WithDeadLetterTopic` option to borrow username and password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig (#11) RMPMP-242: Updated `WithDeadLetterTopic` option to borrow username and password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig --- changelog.md | 4 ++ test/worker_test.go | 135 ++++++++++++++++++++++++++++++++++++++++++++ workoption.go | 11 +++- 3 files changed, 149 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 99e2fda..ca74bb1 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. +## 1.0.2 (Sep 6, 2024) + +1. Updated `WithDeadLetterTopic` option to borrow username and password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig + ## 1.0.1 (Sep 3, 2024) 1. Added dlt topic name in error logs on dlt write failure diff --git a/test/worker_test.go b/test/worker_test.go index 59aa29d..cab8e52 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -803,6 +803,141 @@ func TestWork_WithDeadLetterTopic_MessagesWrittenToDLTSinceErrorOccurred(t *test require.NoError(t, grp.Wait()) } +func TestWork_WithDeadLetterTopic_DLTWriterConfigCanBorrowFromConsumerConfig(t *testing.T) { + type testCase struct { + name string + inputConfig zkafka.ConsumerTopicConfig + expectedDLTConfig zkafka.ProducerTopicConfig + } + + testCases := []testCase{ + { + name: "deadletter-username-password-preferred-over-consumer-config", + inputConfig: zkafka.ConsumerTopicConfig{ + Topic: topicName, + SaslUsername: ptr("user_orig"), + SaslPassword: ptr("pw_orig"), + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + SaslUsername: ptr("user"), + SaslPassword: ptr("pw"), + ClientID: uuid.NewString(), + Topic: "topic2", + }, + }, + expectedDLTConfig: zkafka.ProducerTopicConfig{ + SaslUsername: ptr("user"), + SaslPassword: ptr("pw"), + }, + }, + { + name: "consumer-config-username-password-used-in-abscense-of-dlt-username-password", + inputConfig: zkafka.ConsumerTopicConfig{ + Topic: topicName, + SaslUsername: ptr("user_orig"), + SaslPassword: ptr("pw_orig"), + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + ClientID: uuid.NewString(), + Topic: "topic2", + }, + }, + expectedDLTConfig: zkafka.ProducerTopicConfig{ + SaslUsername: ptr("user_orig"), + SaslPassword: ptr("pw_orig"), + }, + }, + { + name: "username-password-not-specified-is-okay", + inputConfig: zkafka.ConsumerTopicConfig{ + ClientID: uuid.NewString(), + Topic: topicName, + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + Topic: "topic2", + }, + }, + expectedDLTConfig: zkafka.ProducerTopicConfig{ + SaslUsername: nil, + SaslPassword: nil, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer recoverThenFail(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + l := zkafka.NoopLogger{} + + mockReader := zkafka_mocks.NewMockReader(ctrl) + msg1 := getRandomMessage() + gomock.InOrder( + mockReader.EXPECT().Read(gomock.Any()).Return(msg1, nil), + mockReader.EXPECT().Read(gomock.Any()).Return(nil, nil).AnyTimes(), + ) + mockReader.EXPECT().Close().Return(nil).AnyTimes() + + mockWriter := zkafka_mocks.NewMockWriter(ctrl) + // each errored message gets forwarded + mockWriter.EXPECT().WriteRaw(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockWriter.EXPECT().Close().AnyTimes() + + mockClientProvider := zkafka_mocks.NewMockClientProvider(ctrl) + mockClientProvider.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(mockReader, nil) + var gotDLTConfig zkafka.ProducerTopicConfig + mockClientProvider.EXPECT().Writer(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, topicConfig zkafka.ProducerTopicConfig, opts ...zkafka.WriterOption) (zkafka.Writer, error) { + gotDLTConfig = topicConfig + return mockWriter, nil + }) + + kwf := zkafka.NewWorkFactory(mockClientProvider, zkafka.WithLogger(l)) + + processor := fakeProcessor{ + process: func(ctx context.Context, message *zkafka.Message) error { + return errors.New("processor error") + }, + } + + cfg := tc.inputConfig + w1 := kwf.Create( + cfg, + &processor, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + grp := errgroup.Group{} + grp.Go(func() error { + return w1.Run(ctx, nil) + }) + + pollWait(func() bool { + return len(processor.ProcessedMessages()) >= 1 + }, pollOpts{ + timeoutExit: func() { + require.Fail(t, "Timed out during poll") + }, + pollPause: time.Millisecond, + maxWait: 10 * time.Second, + }) + cancel() + require.NoError(t, grp.Wait()) + if tc.expectedDLTConfig.SaslUsername == nil { + require.Nil(t, gotDLTConfig.SaslUsername) + } else { + require.NotNil(t, gotDLTConfig.SaslUsername) + require.Equal(t, *tc.expectedDLTConfig.SaslUsername, *gotDLTConfig.SaslUsername) + } + if tc.expectedDLTConfig.SaslPassword == nil { + require.Nil(t, gotDLTConfig.SaslPassword) + } else { + require.NotNil(t, gotDLTConfig.SaslPassword) + require.Equal(t, *tc.expectedDLTConfig.SaslPassword, *gotDLTConfig.SaslPassword) + } + }) + } +} + // TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing shows that even if get topic writer (for DLT) returns error processing still continues. // This test configures a single virtual partition to process the reader. If processing halted on account of DLT write error, // the test wouldn't get through all 10 messages diff --git a/workoption.go b/workoption.go index 7270107..78b6674 100644 --- a/workoption.go +++ b/workoption.go @@ -115,9 +115,18 @@ func (d dltOption) apply(w *Work) { return } + // if not specified explicitly in dlt config, use username/pw from consumerTopicConfig + dltConfig := d.dltConfig + if dltConfig.SaslUsername == nil || *d.dltConfig.SaslUsername == "" { + dltConfig.SaslUsername = w.topicConfig.SaslUsername + } + + if dltConfig.SaslPassword == nil || *d.dltConfig.SaslPassword == "" { + dltConfig.SaslPassword = w.topicConfig.SaslPassword + } // even if we're going to skip forwarding a message to the DLT (because there was no error), // establish a writer to the DLT early, so when the time comes the write is fast - writer, err := w.kafkaProvider.Writer(ctx, d.dltConfig) + writer, err := w.kafkaProvider.Writer(ctx, dltConfig) if err != nil { w.logger.Errorw(ctx, "Failed to get writer for dlt", "error", err, "offset", message.Offset, "partition", message.Partition, "source_topic", message.Topic, "dlt_topic", d.dltConfig.Topic) return