Skip to content

Commit

Permalink
merge two props into one
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 24, 2024
1 parent f949cdf commit a4c641b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -43,7 +42,6 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand Down Expand Up @@ -589,24 +587,23 @@ private void sendSendReceiptResponseRepl() {
return;
}
// Case-2: is a repl message.
String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SOURCE_LID));
String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SOURCE_EID));
if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) {
Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
+ " messages props were are invalid. producer={}. supportsDedupReplV2: {},"
+ " sequence-id {}, prop-{}: {}, prop-{}: {}",
+ " sequence-id {}, prop-{}: not in expected format",
producer.topic.getName(), producer.producerName,
supportsDedupReplV2(), getSequenceId(),
MSG_PROP_REPL_SOURCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SOURCE_EID, replSequenceEIdStr);
MSG_PROP_REPL_SOURCE_POSITION);
producer.cnx.getCommandSender().sendSendError(producer.producerId,
Math.max(highestSequenceId, sequenceId),
ServerError.PersistenceError, "Message can not determine whether the message is"
+ " duplicated due to the acquired messages props were are invalid");
return;
}
Long replSequenceLId = Long.valueOf(replSequenceLIdStr);
Long replSequenceEId = Long.valueOf(replSequenceEIdStr);
long[] positionPair = (long[]) positionPairObj;
long replSequenceLId = positionPair[0];
long replSequenceEId = positionPair[1];
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId,
replSequenceEId, ledgerId, entryId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -197,10 +196,8 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
// Add props for sequence checking.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_LID)
.setValue(Long.valueOf(entry.getLedgerId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_EID)
.setValue(Long.valueOf(entry.getEntryId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
.setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId()));
msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.Iterator;
Expand Down Expand Up @@ -381,56 +380,57 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);

Long replSequenceLId = null;
Long replSequenceEId = null;
List<KeyValue> kvPairList = md.getPropertiesList();
for (KeyValue kvPair : kvPairList) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_LID)) {
if (StringUtils.isNumeric(kvPair.getValue())) {
replSequenceLId = Long.valueOf(kvPair.getValue());
publishContext.setProperty(MSG_PROP_REPL_SOURCE_LID, replSequenceLId);
} else {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) {
if (!kvPair.getValue().contains(":")) {
log.warn("[{}] Unexpected {}: {}", publishContext.getProducerName(),
MSG_PROP_REPL_SOURCE_POSITION,
kvPair.getValue());
break;
}
}
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_EID)) {
if (StringUtils.isNumeric(kvPair.getValue())) {
replSequenceEId = Long.valueOf(kvPair.getValue());
publishContext.setProperty(MSG_PROP_REPL_SOURCE_EID, replSequenceEId);
} else {
String[] ledgerIdAndEntryId = kvPair.getValue().split(":");
if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0])
|| !StringUtils.isNumeric(ledgerIdAndEntryId[1])) {
log.warn("[{}] Unexpected {}: {}", publishContext.getProducerName(),
MSG_PROP_REPL_SOURCE_POSITION,
kvPair.getValue());
break;
}
}
if (replSequenceLId != null && replSequenceEId != null) {
long[] positionPair = new long[]{Long.valueOf(ledgerIdAndEntryId[0]).longValue(),
Long.valueOf(ledgerIdAndEntryId[1]).longValue()};
publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair);
break;
}
}
}
}

public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) {
Long replSequenceLId = (Long) publishContext.getProperty(MSG_PROP_REPL_SOURCE_LID);
Long replSequenceEId = (Long) publishContext.getProperty(MSG_PROP_REPL_SOURCE_EID);
if (replSequenceLId == null || replSequenceEId == null) {
Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages"
+ " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " prop-{}: {}, prop-{}: {}",
+ " prop-{}: not in expected format",
topic.getName(), publishContext.getProducerName(),
publishContext.supportsDedupReplV2(), publishContext.getSequenceId(),
MSG_PROP_REPL_SOURCE_LID, replSequenceLId,
MSG_PROP_REPL_SOURCE_EID, replSequenceEId);
MSG_PROP_REPL_SOURCE_POSITION);
return MessageDupStatus.Unknown;
}

long[] positionPair = (long[]) positionPairObj;
long replSequenceLId = positionPair[0];
long replSequenceEId = positionPair[1];

String lastSequenceLIdKey = publishContext.getProducerName() + "_LID";
String lastSequenceEIdKey = publishContext.getProducerName() + "_EID";
synchronized (highestSequencedPushed) {
Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey);
Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey);
if (lastSequenceLIdPushed != null && lastSequenceEIdPushed != null
&& (replSequenceLId.compareTo(lastSequenceLIdPushed) < 0
|| (replSequenceLId.compareTo(lastSequenceLIdPushed) == 0
&& replSequenceEId.compareTo(lastSequenceEIdPushed) <= 0))) {
&& (replSequenceLId < lastSequenceLIdPushed.longValue()
|| (replSequenceLId == lastSequenceLIdPushed.longValue()
&& replSequenceEId <= lastSequenceEIdPushed.longValue()))) {
if (log.isDebugEnabled()) {
log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest publishing"
+ " in-progress {}:{}",
Expand All @@ -454,9 +454,9 @@ public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf
replSequenceEId, lastSequenceLIdPersisted, lastSequenceEIdPersisted);
}
if (lastSequenceLIdPersisted != null && lastSequenceEIdPersisted != null
&& (replSequenceLId.compareTo(lastSequenceLIdPersisted) < 0
|| (replSequenceLId.compareTo(lastSequenceLIdPersisted) == 0
&& replSequenceEId.compareTo(lastSequenceEIdPersisted) <= 0))) {
&& (replSequenceLId < lastSequenceLIdPersisted.longValue()
|| (replSequenceLId == lastSequenceLIdPersisted.longValue()
&& replSequenceEId <= lastSequenceEIdPersisted))) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
Expand Down Expand Up @@ -546,21 +546,20 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit
}

public void recordMessagePersistedRepl(PublishContext publishContext, Position position) {
String replSequenceLIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SOURCE_LID));
String replSequenceEIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SOURCE_EID));
if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) {
Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])) {
log.error("[{}] Can not persist highest sequence-id due to the acquired messages"
+ " props are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " prop-{}: {}, prop-{}: {}",
+ " prop-{}: not in expected format",
topic.getName(), publishContext.getProducerName(),
publishContext.supportsDedupReplV2(), publishContext.getSequenceId(),
MSG_PROP_REPL_SOURCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SOURCE_EID, replSequenceEIdStr);
MSG_PROP_REPL_SOURCE_POSITION);
recordMessagePersistedNormal(publishContext, position);
return;
}
Long replSequenceLId = Long.valueOf(replSequenceLIdStr);
Long replSequenceEId = Long.valueOf(replSequenceEIdStr);
long[] positionPair = (long[]) positionPairObj;
long replSequenceLId = positionPair[0];
long replSequenceEId = positionPair[1];
String lastSequenceLIdKey = publishContext.getProducerName() + "_LID";
String lastSequenceEIdKey = publishContext.getProducerName() + "_EID";
highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.pulsar.broker.service.persistent;


import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION;
import io.netty.buffer.ByteBuf;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -101,10 +100,8 @@ protected boolean replicateEntries(List<Entry> entries) {

msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1));
// Add props for sequence checking.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_LID)
.setValue(Long.valueOf(entry.getLedgerId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_EID)
.setValue(Long.valueOf(entry.getEntryId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION)
.setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId()));

headersAndPayload.retain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
@Slf4j
public class GeoReplicationProducerImpl extends ProducerImpl{

public static final String MSG_PROP_REPL_SOURCE_LID = "__MSG_PROP_REPL_SOURCE_LID";
public static final String MSG_PROP_REPL_SOURCE_EID = "__MSG_PROP_REPL_SOURCE_EID";
public static final String MSG_PROP_REPL_SOURCE_POSITION = "__MSG_PROP_REPL_SOURCE_POSITION";
public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER";

private long lastPersistedSourceLedgerId;
Expand Down Expand Up @@ -86,21 +85,17 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI
Long pendingEId = null;
List<KeyValue> kvPairList = op.msg.getMessageBuilder().getPropertiesList();
for (KeyValue kvPair : kvPairList) {
if (MSG_PROP_REPL_SOURCE_LID.equals(kvPair.getKey())) {
if (StringUtils.isNumeric(kvPair.getValue())) {
pendingLId = Long.valueOf(kvPair.getValue());
} else {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_POSITION)) {
if (!kvPair.getValue().contains(":")) {
break;
}
}
if (MSG_PROP_REPL_SOURCE_EID.equals(kvPair.getKey())) {
if (StringUtils.isNumeric(kvPair.getValue())) {
pendingEId = Long.valueOf(kvPair.getValue());
} else {
String[] ledgerIdAndEntryId = kvPair.getValue().split(":");
if (ledgerIdAndEntryId.length != 2 || !StringUtils.isNumeric(ledgerIdAndEntryId[0])
|| !StringUtils.isNumeric(ledgerIdAndEntryId[1])) {
break;
}
}
if (pendingLId != null && pendingEId != null) {
pendingLId = Long.valueOf(ledgerIdAndEntryId[0]);
pendingEId = Long.valueOf(ledgerIdAndEntryId[1]);
break;
}
}
Expand Down

0 comments on commit a4c641b

Please sign in to comment.