@@ -299,6 +299,7 @@ class SimpleOperationTracker implements OperationTracker {
299
299
List <ReplicaId > examinedReplicas = new ArrayList <>();
300
300
originatingDcName = originatingDcName == null ? reassignedOriginDc : originatingDcName ;
301
301
int numLocalAndLiveReplicas = 0 ;
302
+ int numRemoteOriginatingDcAndLiveReplicas = 0 ;
302
303
for (ReplicaId replicaId : replicas ) {
303
304
examinedReplicas .add (replicaId );
304
305
String replicaDcName = replicaId .getDataNodeId ().getDatacenterName ();
@@ -310,6 +311,7 @@ class SimpleOperationTracker implements OperationTracker {
310
311
numLocalAndLiveReplicas ++;
311
312
addToBeginningOfPool (replicaId );
312
313
} else if (crossColoEnabled && isOriginatingDcReplica ) {
314
+ numRemoteOriginatingDcAndLiveReplicas ++;
313
315
addToEndOfPool (replicaId );
314
316
} else if (crossColoEnabled ) {
315
317
backupReplicas .addFirst (replicaId );
@@ -349,6 +351,7 @@ class SimpleOperationTracker implements OperationTracker {
349
351
}
350
352
351
353
maybeDeprioritizeLocalBootstrapReplicas (numLocalAndLiveReplicas );
354
+ maybeShuffleWithRemoteReplicas (numLocalAndLiveReplicas , numRemoteOriginatingDcAndLiveReplicas );
352
355
totalReplicaCount = replicaPool .size ();
353
356
354
357
// MockPartitionId.getReplicaIds() is returning a shared reference which may cause race condition.
@@ -614,6 +617,39 @@ void maybeDeprioritizeLocalBootstrapReplicas(int numLocalAndLiveReplicas) {
614
617
}
615
618
}
616
619
620
+ /**
621
+ * For get operations, if there are very few local replicas, then shuffle the pool with remote replicas so that the
622
+ * few (or one) remaining local replicas don't get all the traffic. The threshold for minimum number of local replicas
623
+ * is defined in {@link RouterConfig#routerGetOperationMinLocalReplicaCountToPrioritizeLocal}.
624
+ * @param numLocalAndLiveReplicas the number of local and live replicas.
625
+ * @param numRemoteOriginatingDcAndLiveReplicas the number of remote originating DC and live replicas.
626
+ */
627
+ void maybeShuffleWithRemoteReplicas (int numLocalAndLiveReplicas , int numRemoteOriginatingDcAndLiveReplicas ) {
628
+ if (isGetOperation () && numLocalAndLiveReplicas > 0
629
+ && numLocalAndLiveReplicas <= routerConfig .routerGetOperationMinLocalReplicaCountToPrioritizeLocal ) {
630
+ logger .debug ("Shuffling replicas for {} because there are only {} local and live replicas for {}" ,
631
+ routerOperation , numLocalAndLiveReplicas , partitionId );
632
+ routerMetrics .shuffledWithRemoteReplicasForGetDueToFewLocalReplicas .inc ();
633
+ List <ReplicaId > replicasToReshuffle = new ArrayList <>();
634
+ if (numRemoteOriginatingDcAndLiveReplicas > 0 ) {
635
+ // If the local DC is not the originating DC, we shuffle only with originating DC replicas.
636
+ replicasToReshuffle .addAll (
637
+ replicaPool .subList (0 , numLocalAndLiveReplicas + numRemoteOriginatingDcAndLiveReplicas ));
638
+ } else {
639
+ replicasToReshuffle .addAll (replicaPool );
640
+ }
641
+ Collections .shuffle (replicasToReshuffle );
642
+ ListIterator <ReplicaId > iter = replicaPool .listIterator ();
643
+ for (ReplicaId replicaId : replicasToReshuffle ) {
644
+ iter .next ();
645
+ iter .set (replicaId );
646
+ }
647
+ if (!replicaPool .get (0 ).getDataNodeId ().getDatacenterName ().equals (datacenterName )) {
648
+ routerMetrics .remoteReplicaPrioritizedForGetDueToFewLocalReplicas .inc ();
649
+ }
650
+ }
651
+ }
652
+
617
653
public boolean hasFailed () {
618
654
if (routerOperation == RouterOperation .PutOperation && routerConfig .routerPutUseDynamicSuccessTarget ) {
619
655
return totalReplicaCount - failedCount < Math .max (totalReplicaCount - 1 ,
0 commit comments