Skip to content

Commit

Permalink
NIFI-14053 PublishKafka should allow specification of KafkaKey when u…
Browse files Browse the repository at this point in the history
…sing FlowFile publish strategy

Signed-off-by: Matt Burgess <[email protected]>

This closes apache#9557
  • Loading branch information
greyp9 authored and mattyb149 committed Dec 6, 2024
1 parent a555918 commit c4420b4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

@TestMethodOrder(MethodOrderer.MethodName.class)
public class PublishKafkaIT extends AbstractPublishKafkaIT {
private static final String TEST_KEY_ATTRIBUTE = "my-key";
private static final String TEST_KEY_VALUE = "some-key-value";
private static final String TEST_RECORD_VALUE = "value-" + System.currentTimeMillis();

@Test
Expand All @@ -48,11 +49,12 @@ public void test_1_KafkaTestContainerProduceOne() throws InitializationException
runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
//runner.setProperty(PublishKafka.USE_TRANSACTIONS, Boolean.FALSE.toString());
runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE);

final Map<String, String> attributes = new HashMap<>();
attributes.put("a1", "valueA1");
attributes.put("b1", "valueB1");
attributes.put(TEST_KEY_ATTRIBUTE, TEST_KEY_VALUE);

runner.enqueue(TEST_RECORD_VALUE, attributes);
runner.run();
Expand All @@ -66,7 +68,7 @@ public void test_2_KafkaTestContainerConsumeOne() {
final ConsumerRecords<String, String> records = consumer.poll(DURATION_POLL);
assertEquals(1, records.count());
final ConsumerRecord<String, String> record = records.iterator().next();
assertNull(record.key());
assertEquals(TEST_KEY_VALUE, record.key());
assertEquals(TEST_RECORD_VALUE, record.value());
final List<Header> headers = Arrays.asList(record.headers().toArray());
assertEquals(1, headers.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER)
.build();

static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -526,18 +525,17 @@ private KafkaRecordConverter getKafkaRecordConverter(final ProcessContext contex

final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();

if (readerFactory != null && writerFactory != null) {
final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue());
final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue());

final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue();
final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null))
final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue());
final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue();
final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null))
? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger())
: new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding);

if (readerFactory != null && writerFactory != null) {
final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue());
if (publishStrategy == PublishStrategy.USE_WRAPPER) {
return new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, maxMessageSize, getLogger());
} else {
Expand All @@ -551,7 +549,7 @@ private KafkaRecordConverter getKafkaRecordConverter(final ProcessContext contex
return new DelimitedStreamKafkaRecordConverter(demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory);
}

return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory);
return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory, keyFactory);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;

import java.io.ByteArrayOutputStream;
Expand All @@ -34,10 +35,12 @@
public class FlowFileStreamKafkaRecordConverter implements KafkaRecordConverter {
final int maxMessageSize;
final HeadersFactory headersFactory;
final KeyFactory keyFactory;

public FlowFileStreamKafkaRecordConverter(final int maxMessageSize, final HeadersFactory headersFactory) {
public FlowFileStreamKafkaRecordConverter(final int maxMessageSize, final HeadersFactory headersFactory, final KeyFactory keyFactory) {
this.maxMessageSize = maxMessageSize;
this.headersFactory = headersFactory;
this.keyFactory = keyFactory;
}

@Override
Expand All @@ -50,7 +53,7 @@ public Iterator<KafkaRecord> convert(final Map<String, String> attributes, final
recordBytes = baos.toByteArray();
}

final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, recordBytes, headersFactory.getHeaders(attributes));
final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, keyFactory.getKey(attributes, null), recordBytes, headersFactory.getHeaders(attributes));
return List.of(kafkaRecord).iterator();
}
}

0 comments on commit c4420b4

Please sign in to comment.