Skip to content

Commit

Permalink
rename supportsDedupReplV2 -> supportsReplDedupByLidAndEid
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 24, 2024
1 parent a4c641b commit 5340223
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
return true;
}

private boolean isSupportsDedupReplV2() {
private boolean isSupportsReplDedupByLidAndEid() {
// Non-Persistent topic does not have ledger id or entry id, so it does not support.
return cnx.isClientSupportsDedupReplV2() && topic.isPersistent();
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsDedupReplV2());
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand All @@ -299,7 +299,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
int batchSize, boolean isChunked, boolean isMarker, Position position) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position, isSupportsDedupReplV2());
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down Expand Up @@ -394,7 +394,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
private int batchSize;
private boolean chunked;
private boolean isMarker;
private boolean supportsDedupReplV2;
private boolean supportsReplDedupByLidAndEid;

private long startTimeNs;

Expand Down Expand Up @@ -481,8 +481,8 @@ public long getOriginalSequenceId() {
}

@Override
public boolean supportsDedupReplV2() {
return supportsDedupReplV2;
public boolean supportsReplDedupByLidAndEid() {
return supportsReplDedupByLidAndEid;
}

@Override
Expand Down Expand Up @@ -559,7 +559,7 @@ public void run() {
// stats
producer.stats.recordMsgIn(batchSize, msgSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
if (producer.isRemoteOrShadow && producer.isSupportsDedupReplV2()) {
if (producer.isRemoteOrShadow && producer.isSupportsReplDedupByLidAndEid()) {
sendSendReceiptResponseRepl();
} else {
// Repl V1 is the same as normal for this handling.
Expand Down Expand Up @@ -590,10 +590,10 @@ private void sendSendReceiptResponseRepl() {
Object positionPairObj = getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
+ " messages props were are invalid. producer={}. supportsDedupReplV2: {},"
+ " messages props were are invalid. producer={}. supportsReplDedupByLidAndEid: {},"
+ " sequence-id {}, prop-{}: not in expected format",
producer.topic.getName(), producer.producerName,
supportsDedupReplV2(), getSequenceId(),
supportsReplDedupByLidAndEid(), getSequenceId(),
MSG_PROP_REPL_SOURCE_POSITION);
producer.cnx.getCommandSender().sendSendError(producer.producerId,
Math.max(highestSequenceId, sequenceId),
Expand All @@ -615,7 +615,7 @@ private void sendSendReceiptResponseNormal() {

static MessagePublishContext get(Producer producer, long sequenceId, int msgSize, int batchSize,
boolean chunked, long startTimeNs, boolean isMarker, Position position,
boolean supportsDedupReplV2) {
boolean supportsReplDedupByLidAndEid) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
Expand All @@ -626,7 +626,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.supportsDedupReplV2 = supportsDedupReplV2;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
Expand All @@ -637,7 +637,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, int msgSize,
int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
boolean supportsDedupReplV2) {
boolean supportsReplDedupByLidAndEid) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
Expand All @@ -649,7 +649,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.supportsDedupReplV2 = supportsDedupReplV2;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
Expand Down Expand Up @@ -868,7 +868,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
cnx.isClientSupportsDedupReplV2());
cnx.isClientSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ default void setEntryTimestamp(long entryTimestamp) {

}

default boolean supportsDedupReplV2() {
default boolean supportsReplDedupByLidAndEid() {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public interface TransportCnx {

FeatureFlags getFeatures();

default boolean isClientSupportsDedupReplV2() {
return getFeatures() != null && getFeatures().hasSupportsDedupReplV2() && getFeatures().isSupportsDedupReplV2();
default boolean isClientSupportsReplDedupByLidAndEid() {
return getFeatures() != null && getFeatures().hasSupportsReplDedupByLidAndEid()
&& getFeatures().isSupportsReplDedupByLidAndEid();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
return MessageDupStatus.NotDup;
}
if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) {
if (!publishContext.supportsDedupReplV2()){
if (!publishContext.supportsReplDedupByLidAndEid()){
return isDuplicateReplV1(publishContext, headersAndPayload);
} else {
return isDuplicateReplV2(publishContext, headersAndPayload);
Expand Down Expand Up @@ -410,10 +410,10 @@ public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf
Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages"
+ " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " props were are invalid. producer={}. supportsReplDedupByLidAndEid: {}, sequence-id {},"
+ " prop-{}: not in expected format",
topic.getName(), publishContext.getProducerName(),
publishContext.supportsDedupReplV2(), publishContext.getSequenceId(),
publishContext.supportsReplDedupByLidAndEid(), publishContext.getSequenceId(),
MSG_PROP_REPL_SOURCE_POSITION);
return MessageDupStatus.Unknown;
}
Expand Down Expand Up @@ -538,7 +538,8 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit
if (!isEnabled() || publishContext.isMarkerMessage()) {
return;
}
if (publishContext.getProducerName().startsWith(replicatorPrefix) && publishContext.supportsDedupReplV2()) {
if (publishContext.getProducerName().startsWith(replicatorPrefix)
&& publishContext.supportsReplDedupByLidAndEid()) {
recordMessagePersistedRepl(publishContext, position);
} else {
recordMessagePersistedNormal(publishContext, position);
Expand All @@ -549,10 +550,10 @@ public void recordMessagePersistedRepl(PublishContext publishContext, Position p
Object positionPairObj = publishContext.getProperty(MSG_PROP_REPL_SOURCE_POSITION);
if (positionPairObj == null || !(positionPairObj instanceof long[])) {
log.error("[{}] Can not persist highest sequence-id due to the acquired messages"
+ " props are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " props are invalid. producer={}. supportsReplDedupByLidAndEid: {}, sequence-id {},"
+ " prop-{}: not in expected format",
topic.getName(), publishContext.getProducerName(),
publishContext.supportsDedupReplV2(), publishContext.getSequenceId(),
publishContext.supportsReplDedupByLidAndEid(), publishContext.getSequenceId(),
MSG_PROP_REPL_SOURCE_POSITION);
recordMessagePersistedNormal(publishContext, position);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,66 +169,66 @@ protected Runnable injectReplicatorClientCnx(
public Object[][] deduplicationArgs() {
return new Object[][] {
{true/* inject repeated publishing*/, 1/* repeated messages window */,
true /* supportsDedupReplV2 */, false/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
{true/* inject repeated publishing*/, 2/* repeated messages window */,
true /* supportsDedupReplV2 */, false/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
{true/* inject repeated publishing*/, 3/* repeated messages window */,
true /* supportsDedupReplV2 */, false/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
{true/* inject repeated publishing*/, 4/* repeated messages window */,
true /* supportsDedupReplV2 */, false/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
{true/* inject repeated publishing*/, 5/* repeated messages window */,
true /* supportsDedupReplV2 */, false/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
{true/* inject repeated publishing*/, 10/* repeated messages window */,
true /* supportsDedupReplV2 */, false/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
// ===== multi schema
{true/* inject repeated publishing*/, 1/* repeated messages window */,
true /* supportsDedupReplV2 */, true/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
{true/* inject repeated publishing*/, 2/* repeated messages window */,
true /* supportsDedupReplV2 */, true/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
{true/* inject repeated publishing*/, 3/* repeated messages window */,
true /* supportsDedupReplV2 */, true/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
{true/* inject repeated publishing*/, 4/* repeated messages window */,
true /* supportsDedupReplV2 */, true/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
{true/* inject repeated publishing*/, 5/* repeated messages window */,
true /* supportsDedupReplV2 */, true/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
{true/* inject repeated publishing*/, 10/* repeated messages window */,
true /* supportsDedupReplV2 */, true/* multi schemas */},
true /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
// ===== Compatability "source-cluster: old, target-cluster: new".
{false/* inject repeated publishing*/, 0/* repeated messages window */,
false /* supportsDedupReplV2 */, false/* multi schemas */},
false /* supportsReplDedupByLidAndEid */, false/* multi schemas */},
{false/* inject repeated publishing*/, 0/* repeated messages window */,
false /* supportsDedupReplV2 */, true/* multi schemas */},
false /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
{true/* inject repeated publishing*/, 3/* repeated messages window */,
false /* supportsDedupReplV2 */, true/* multi schemas */},
false /* supportsReplDedupByLidAndEid */, true/* multi schemas */},
};
}

// TODO
// - Review the code to confirm that multi source-brokers can work when the source topic switch.
@Test(timeOut = 360 * 1000, dataProvider = "deduplicationArgs")
public void testDeduplication(final boolean injectRepeatedPublish, final int repeatedMessagesWindow,
final boolean supportsDedupReplV2, boolean multiSchemas) throws Exception {
final boolean supportsReplDedupByLidAndEid, boolean multiSchemas) throws Exception {
// 0. Inject a mechanism that duplicate all Send-Command for the replicator.
final List<ByteBufPair> duplicatedMsgs = new ArrayList<>();
Runnable taskToClearInjection = injectReplicatorClientCnx(
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {

@Override
protected ByteBuf newConnectCommand() throws Exception {
if (supportsDedupReplV2) {
if (supportsReplDedupByLidAndEid) {
return super.newConnectCommand();
}
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
BaseCommand cmd = Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), authData,
this.protocolVersion, clientVersion, proxyToTargetBrokerAddress, null, null, null, null);
cmd.getConnect().getFeatureFlags().setSupportsDedupReplV2(false);
cmd.getConnect().getFeatureFlags().setSupportsReplDedupByLidAndEid(false);
return Commands.serializeWithSize(cmd);
}

@Override
public boolean isBrokerSupportsDedupReplV2() {
return supportsDedupReplV2;
public boolean isBrokerSupportsReplDedupByLidAndEid() {
return supportsReplDedupByLidAndEid;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public class ClientCnx extends PulsarHandler {
@Getter
private boolean supportsGetPartitionedMetadataWithoutAutoCreation;
@Getter
private boolean brokerSupportsDedupReplV2;
private boolean brokerSupportsReplDedupByLidAndEid;

/** Idle stat. **/
@Getter
Expand Down Expand Up @@ -409,8 +409,8 @@ protected void handleConnected(CommandConnected connected) {
supportsGetPartitionedMetadataWithoutAutoCreation =
connected.hasFeatureFlags()
&& connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation();
brokerSupportsDedupReplV2 =
connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsDedupReplV2();
brokerSupportsReplDedupByLidAndEid =
connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsReplDedupByLidAndEid();

// set remote protocol version to the correct version before we complete the connection future
setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public GeoReplicationProducerImpl(PulsarClientImpl client, String topic,
isPersistentTopic = TopicName.get(topic).isPersistent();
}

private boolean isBrokerSupportsDedupReplV2(ClientCnx cnx) {
private boolean isBrokerSupportsReplDedupByLidAndEid(ClientCnx cnx) {
// Non-Persistent topic does not have ledger id or entry id, so it does not support.
return cnx.isBrokerSupportsDedupReplV2() && isPersistentTopic;
return cnx.isBrokerSupportsReplDedupByLidAndEid() && isPersistentTopic;
}

@Override
protected void ackReceived(ClientCnx cnx, long seq, long highSeq, long ledgerId, long entryId) {
if (!isBrokerSupportsDedupReplV2(cnx)) {
if (!isBrokerSupportsReplDedupByLidAndEid(cnx)) {
// Repl V1 is the same as normal for this handling.
super.ackReceived(cnx, seq, highSeq, ledgerId, entryId);
return;
Expand Down
Loading

0 comments on commit 5340223

Please sign in to comment.