Skip to content

Commit

Permalink
[FLINK-36180] Fix batch message data loss (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenbingshen authored Oct 9, 2024
1 parent 787f5e4 commit 34f217b
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
Expand Down Expand Up @@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) {
if (include) {
return messageIdImpl;
} else {
// if the message is batched, should return next single message in current batch.
if (messageIdImpl.getBatchIndex() >= 0
&& messageIdImpl.getBatchSize() > 0
&& messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) {
return new BatchMessageIdImpl(
messageIdImpl.getLedgerId(),
messageIdImpl.getEntryId(),
messageIdImpl.getPartitionIndex(),
messageIdImpl.getBatchIndex() + 1,
messageIdImpl.getBatchSize(),
messageIdImpl.getAckSet());
}

// if the (ledgerId, entryId + 1) is not valid
// pulsar broker will automatically set the cursor to the next valid message
return new MessageIdImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
Expand Down Expand Up @@ -196,7 +197,14 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
MessageId latestConsumedId = registeredSplit.getLatestConsumedId();

if (latestConsumedId != null) {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
if (latestConsumedId instanceof BatchMessageIdImpl) {
LOG.info(
"Reset subscription position by the checkpoint {}, batchSize {}",
latestConsumedId,
((BatchMessageIdImpl) latestConsumedId).getBatchSize());
} else {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
}
try {
CursorPosition cursorPosition;
if (latestConsumedId == MessageId.latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -244,7 +246,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
PulsarPartitionSplitReader splitReader = splitReader();
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
Expand All @@ -263,6 +265,37 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
fetchedMessages(splitReader, 1, true);
}

@Test
void consumeBatchMessageFromRecover() throws Exception {
PulsarPartitionSplitReader splitReader = splitReader();
String topicName = randomAlphabetic(10);

int numRecords = 20;
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
.admin()
.topics()
.getLastMessageId(topicNameWithPartition(topicName, 0));
// Pretend that we consumed the 2th message of the last batch Entry
int lastConsumedBatchIndex = 2;
BitSet ackSet = new BitSet(numRecords);
ackSet.set(0, numRecords);
BatchMessageIdImpl batchMessageId =
new BatchMessageIdImpl(
lastMessageId.getLedgerId(),
lastMessageId.getEntryId(),
lastMessageId.getPartitionIndex(),
lastConsumedBatchIndex,
numRecords,
ackSet);
int expectedCount = numRecords - lastConsumedBatchIndex - 1;
// when recover, use exclusive startCursor
handleSplit(splitReader, topicName, 0, batchMessageId);
fetchedMessages(splitReader, expectedCount, true);
}

/** Create a split reader with max message 1, fetch timeout 1s. */
private PulsarPartitionSplitReader splitReader() {
return new PulsarPartitionSplitReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,6 @@ public void testPulsarMaxwellChangelogSource() throws Exception {
}

private void writeRecordsToPulsar(String topic, List<String> lines) throws Exception {
pulsar.operator().sendMessages(topic, Schema.STRING, lines);
pulsar.operator().sendMessages(topic, Schema.STRING, lines, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ void sendMessageWithPropertiesAndReadPropertiesMetadata() throws Exception {
properties.put("key1", "value1");
properties.put("key2", "value2");
try (Producer<String> producer =
pulsar.operator().createProducer(sourceTopic, Schema.STRING)) {
pulsar.operator().createProducer(sourceTopic, Schema.STRING, false)) {
producer.newMessage().value(value).properties(properties).send();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -146,7 +148,7 @@ public void setupTopic(String topic) throws Exception {
*/
public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier)
throws Exception {
setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION);
setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION, false);
}

/**
Expand All @@ -159,7 +161,11 @@ public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier)
* @param numRecordsPerSplit The number of records for a partition.
*/
public <T> void setupTopic(
String topic, Schema<T> schema, Supplier<T> supplier, int numRecordsPerSplit)
String topic,
Schema<T> schema,
Supplier<T> supplier,
int numRecordsPerSplit,
boolean enableBatch)
throws Exception {
String topicName = topicName(topic);
createTopic(topicName, DEFAULT_PARTITIONS);
Expand All @@ -170,7 +176,7 @@ public <T> void setupTopic(
List<T> messages =
Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList());

sendMessages(partitionName, schema, messages);
sendMessages(partitionName, schema, messages, enableBatch);
}
}

Expand Down Expand Up @@ -250,7 +256,7 @@ public List<TopicPartition> topicInfo(String topic) throws Exception {
* @return message id.
*/
public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) throws Exception {
List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message));
List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message), false);
checkArgument(messageIds.size() == 1);

return messageIds.get(0);
Expand All @@ -268,7 +274,8 @@ public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) thro
*/
public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message)
throws Exception {
List<MessageId> messageIds = sendMessages(topic, schema, key, singletonList(message));
List<MessageId> messageIds =
sendMessages(topic, schema, key, singletonList(message), false);
checkArgument(messageIds.size() == 1);

return messageIds.get(0);
Expand All @@ -283,9 +290,10 @@ public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T m
* @param <T> The type of the record.
* @return message id.
*/
public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collection<T> messages)
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, Collection<T> messages, boolean enableBatch)
throws Exception {
return sendMessages(topic, schema, null, messages);
return sendMessages(topic, schema, null, messages, enableBatch);
}

/**
Expand All @@ -299,16 +307,20 @@ public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collecti
* @return message id.
*/
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, String key, Collection<T> messages) throws Exception {
try (Producer<T> producer = createProducer(topic, schema)) {
String topic, Schema<T> schema, String key, Collection<T> messages, boolean enableBatch)
throws Exception {
try (Producer<T> producer = createProducer(topic, schema, enableBatch)) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (T message : messages) {
TypedMessageBuilder<T> builder = producer.newMessage().value(message);
if (!Strings.isNullOrEmpty(key)) {
builder.key(key);
}
MessageId messageId = builder.send();
messageIds.add(messageId);
final CompletableFuture<MessageId> messageIdCompletableFuture = builder.sendAsync();
messageIdCompletableFuture.whenComplete(
(messageId, ignore) -> {
messageIds.add(messageId);
});
}
producer.flush();
return messageIds;
Expand Down Expand Up @@ -479,12 +491,15 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) throws
}
}

public <T> Producer<T> createProducer(String topic, Schema<T> schema) throws Exception {
public <T> Producer<T> createProducer(String topic, Schema<T> schema, boolean enableBatch)
throws Exception {
return client().newProducer(schema)
.topic(topic)
.enableBatching(false)
.enableBatching(enableBatch)
.enableMultiSchema(true)
.accessMode(Shared)
.batchingMaxPublishDelay(
10, TimeUnit.SECONDS) // Give enough time to assemble the batch
.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public void writeRecords(List<String> records) {
try {
// Send messages with the key we don't need.
List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords, false);

// Send messages with the given key.
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records, false);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PulsarPartitionDataWriter(
@Override
public void writeRecords(List<T> records) {
try {
operator.sendMessages(fullTopicName, schema, records);
operator.sendMessages(fullTopicName, schema, records, false);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
Expand Down

0 comments on commit 34f217b

Please sign in to comment.