Skip to content

Commit

Permalink
[AMQ-8354] Add JMX metric to monitor replication flow control.
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaShupletsov committed Jul 19, 2023
1 parent e1c477a commit 39cb1ef
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void onMessage(Message jmsMessage) {
long nextWarn = start;
try {
while (!memoryUsage.waitForSpace(1000, 90)) {
replicaStatistics.setReplicaReplicationFlowControl(true);
long now = System.currentTimeMillis();
if (now >= nextWarn) {
logger.warn("High memory usage. Pausing replication (paused for: {}s)", (now - start) / 1000);
Expand All @@ -137,6 +138,7 @@ public void onMessage(Message jmsMessage) {
throw new RuntimeException(e);
}
}
replicaStatistics.setReplicaReplicationFlowControl(false);

try {
processMessageWithRetries(message, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ boolean iterateSend() {
long nextWarn = start;
try {
while (!memoryUsage.waitForSpace(1000, 95)) {
replicaStatistics.setSourceReplicationFlowControl(true);
long now = System.currentTimeMillis();
if (now >= nextWarn) {
logger.warn("High memory usage. Pausing replication (paused for: {}s)", (now - start) / 1000);
Expand All @@ -489,6 +490,7 @@ boolean iterateSend() {
throw new RuntimeException(e);
}
}
replicaStatistics.setSourceReplicationFlowControl(false);

iterateSend0();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ReplicaStatistics {
Expand All @@ -33,6 +34,9 @@ public class ReplicaStatistics {
private AtomicLong replicationLag;
private AtomicLong replicaLastProcessedTime;

private AtomicBoolean sourceReplicationFlowControl;
private AtomicBoolean replicaReplicationFlowControl;

public ReplicaStatistics() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
if (tpsCounter == null) {
Expand Down Expand Up @@ -60,6 +64,9 @@ public void reset() {
sourceLastProcessedTime = null;
replicationLag = null;
replicaLastProcessedTime = null;

sourceReplicationFlowControl = null;
replicaReplicationFlowControl = null;
}

public void increaseTpsCounter(long size) {
Expand Down Expand Up @@ -116,4 +123,26 @@ public void setReplicaLastProcessedTime(long replicaLastProcessedTime) {
}
this.replicaLastProcessedTime.set(replicaLastProcessedTime);
}

public AtomicBoolean getSourceReplicationFlowControl() {
return sourceReplicationFlowControl;
}

public void setSourceReplicationFlowControl(boolean sourceReplicationFlowControl) {
if (this.sourceReplicationFlowControl == null) {
this.sourceReplicationFlowControl = new AtomicBoolean();
}
this.sourceReplicationFlowControl.set(sourceReplicationFlowControl);
}

public AtomicBoolean getReplicaReplicationFlowControl() {
return replicaReplicationFlowControl;
}

public void setReplicaReplicationFlowControl(boolean replicaReplicationFlowControl) {
if (this.replicaReplicationFlowControl == null) {
this.replicaReplicationFlowControl = new AtomicBoolean();
}
this.replicaReplicationFlowControl.set(replicaReplicationFlowControl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.activemq.replica.ReplicaStatistics;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ReplicationView implements ReplicationViewMBean {
Expand Down Expand Up @@ -69,4 +70,14 @@ public Long getReplicaWaitTime() {
return Optional.ofNullable(replicaStatistics.getReplicaLastProcessedTime()).map(AtomicLong::get)
.map(v -> System.currentTimeMillis() - v).orElse(null);
}

@Override
public Boolean getSourceReplicationFlowControl() {
return Optional.ofNullable(replicaStatistics.getSourceReplicationFlowControl()).map(AtomicBoolean::get).orElse(null);
}

@Override
public Boolean getReplicaReplicationFlowControl() {
return Optional.ofNullable(replicaStatistics.getReplicaReplicationFlowControl()).map(AtomicBoolean::get).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ public interface ReplicationViewMBean {

@MBeanInfo("Get wait time(if the broker's role is replica)")
Long getReplicaWaitTime();

@MBeanInfo("Flow control is enabled for replication on the source side")
Boolean getSourceReplicationFlowControl();

@MBeanInfo("Flow control is enabled for replication on the replica side")
Boolean getReplicaReplicationFlowControl();
}

0 comments on commit 39cb1ef

Please sign in to comment.