Skip to content

Commit

Permalink
[AMQ-8354] add replication lag and wait time metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaShupletsov committed Jun 3, 2023
1 parent 213802e commit b5b3975
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ public class ReplicaBroker extends MutativeRoleBroker {
private final ReplicaReplicationQueueSupplier queueProvider;
private final ReplicaPolicy replicaPolicy;
private final PeriodAcknowledge periodAcknowledgeCallBack;
private final ReplicaStatistics replicaStatistics;
private ReplicaBrokerEventListener messageListener;
private ScheduledFuture<?> replicationScheduledFuture;
private ScheduledFuture<?> ackPollerScheduledFuture;

public ReplicaBroker(Broker broker, ReplicaRoleManagement management, ReplicaReplicationQueueSupplier queueProvider,
ReplicaPolicy replicaPolicy) {
ReplicaPolicy replicaPolicy, ReplicaStatistics replicaStatistics) {
super(broker, management);
this.queueProvider = queueProvider;
this.replicaPolicy = replicaPolicy;
this.periodAcknowledgeCallBack = new PeriodAcknowledge(replicaPolicy);
this.replicaStatistics = replicaStatistics;
}

@Override
Expand Down Expand Up @@ -126,7 +128,7 @@ private void init(ReplicaRole role) {
}
}
}, replicaPolicy.getReplicaAckPeriod(), replicaPolicy.getReplicaAckPeriod(), TimeUnit.MILLISECONDS);
messageListener = new ReplicaBrokerEventListener(this, queueProvider, periodAcknowledgeCallBack);
messageListener = new ReplicaBrokerEventListener(this, queueProvider, periodAcknowledgeCallBack, replicaStatistics);
}

private void deinitialize() throws JMSException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ReplicaBrokerEventListener implements MessageListener {
private final ConnectionContext connectionContext;
private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
private final PeriodAcknowledge acknowledgeCallback;
private final ReplicaStatistics replicaStatistics;
private final AtomicReference<ReplicaEventRetrier> replicaEventRetrier = new AtomicReference<>();
final ReplicaSequenceStorage sequenceStorage;
private final TransactionBroker transactionBroker;
Expand All @@ -77,10 +78,11 @@ public class ReplicaBrokerEventListener implements MessageListener {
MessageId sequenceMessageId;

ReplicaBrokerEventListener(ReplicaBroker replicaBroker, ReplicaReplicationQueueSupplier queueProvider,
PeriodAcknowledge acknowledgeCallback) {
PeriodAcknowledge acknowledgeCallback, ReplicaStatistics replicaStatistics) {
this.replicaBroker = requireNonNull(replicaBroker);
this.broker = requireNonNull(replicaBroker.getNext());
this.acknowledgeCallback = requireNonNull(acknowledgeCallback);
this.replicaStatistics = replicaStatistics;
connectionContext = broker.getAdminConnectionContext().copy();
connectionContext.setUserName(ReplicaSupport.REPLICATION_PLUGIN_USER_NAME);
connectionContext.setClientId(REPLICATION_CONSUMER_CLIENT_ID);
Expand Down Expand Up @@ -203,6 +205,10 @@ private void processMessage(ActiveMQMessage message, ReplicaEventType eventType,
"Replication event is out of order. Current sequence %s belongs to message with id %s," +
"but the id of the event is %s", sequence, sequenceMessageId, messageId));
}

long currentTime = System.currentTimeMillis();
replicaStatistics.setReplicationLag(currentTime - message.getTimestamp());
replicaStatistics.setReplicaLastProcessedTime(currentTime);
}

private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, Object deserializedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ public class ReplicaCompactor {
private final ReplicaReplicationQueueSupplier queueProvider;
private final PrefetchSubscription subscription;
private final int additionalMessagesLimit;
private final ReplicaStatistics replicaStatistics;

private final Queue intermediateQueue;

public ReplicaCompactor(Broker broker, ReplicaReplicationQueueSupplier queueProvider, PrefetchSubscription subscription,
int additionalMessagesLimit) {
int additionalMessagesLimit, ReplicaStatistics replicaStatistics) {
this.broker = broker;
this.queueProvider = queueProvider;
this.subscription = subscription;
this.additionalMessagesLimit = additionalMessagesLimit;
this.replicaStatistics = replicaStatistics;

intermediateQueue = broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
.map(DestinationExtractor::extractQueue).orElseThrow();
Expand Down Expand Up @@ -157,6 +159,8 @@ private List<DeliveredMessageReference> compactAndFilter0(ConnectionContext conn
Set<MessageId> messageIds = toDelete.stream().map(dmid -> dmid.messageReference.getMessageId()).collect(Collectors.toSet());
result.removeIf(reference -> messageIds.contains(reference.messageReference.getMessageId()));

replicaStatistics.increaseTpsCounter(toDelete.size());

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ReplicaEvent {
private Map<String, Object> replicationProperties = new HashMap<>();

private Integer version;
private Long timestamp;

ReplicaEvent setTransactionId(TransactionId transactionId) {
this.transactionId = transactionId;
Expand Down Expand Up @@ -64,6 +65,12 @@ ReplicaEvent setVersion(int version) {
return this;
}

ReplicaEvent setTimestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}


TransactionId getTransactionId() {
return transactionId;
}
Expand All @@ -83,4 +90,8 @@ public Map<String, Object> getReplicationProperties() {
public Integer getVersion() {
return version;
}

public Long getTimestamp() {
return timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ public Broker installPlugin(final Broker broker) throws Exception {

logger.info("{} installed, running as {}", ReplicaPlugin.class.getName(), role);

final BrokerService brokerService = broker.getBrokerService();
ReplicaStatistics replicaStatistics = new ReplicaStatistics();

BrokerService brokerService = broker.getBrokerService();
if (brokerService.isUseJmx()) {
replicationView = new ReplicationView(this);
replicationView = new ReplicationView(this, replicaStatistics);
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), replicationView, ReplicationJmxHelper.createJmxName(brokerService));
}

Expand All @@ -96,7 +98,7 @@ public Broker installPlugin(final Broker broker) throws Exception {
interceptors[interceptors.length - 1] = new ReplicaAdvisorySuppressor();
compositeInterceptor.setInterceptors(interceptors);

replicaRoleManagementBroker = new ReplicaRoleManagementBroker(broker, replicaPolicy, role);
replicaRoleManagementBroker = new ReplicaRoleManagementBroker(broker, replicaPolicy, role, replicaStatistics);

return new ReplicaAuthorizationBroker(replicaRoleManagementBroker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ReplicaRoleManagementBroker extends MutableBrokerFilter implements
private final ReplicaPolicy replicaPolicy;
private final ClassLoader contextClassLoader;
private ReplicaRole role;
private final ReplicaStatistics replicaStatistics;
private final ReplicaReplicationQueueSupplier queueProvider;
private final WebConsoleAccessController webConsoleAccessController;
private final ReplicaInternalMessageProducer replicaInternalMessageProducer;
Expand All @@ -59,11 +60,12 @@ public class ReplicaRoleManagementBroker extends MutableBrokerFilter implements
ReplicaBroker replicaBroker;
private ReplicaRoleStorage replicaRoleStorage;

public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, ReplicaRole role) {
public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, ReplicaRole role, ReplicaStatistics replicaStatistics) {
super(broker);
this.broker = broker;
this.replicaPolicy = replicaPolicy;
this.role = role;
this.replicaStatistics = replicaStatistics;

contextClassLoader = Thread.currentThread().getContextClassLoader();

Expand All @@ -77,7 +79,7 @@ public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, R
ReplicationMessageProducer replicationMessageProducer =
new ReplicationMessageProducer(replicaInternalMessageProducer, queueProvider);
ReplicaSequencer replicaSequencer = new ReplicaSequencer(broker, queueProvider, replicaInternalMessageProducer,
replicationMessageProducer, replicaPolicy);
replicationMessageProducer, replicaPolicy, replicaStatistics);

sourceBroker = buildSourceBroker(replicationMessageProducer, replicaSequencer, queueProvider);
replicaBroker = buildReplicaBroker(queueProvider);
Expand Down Expand Up @@ -131,6 +133,7 @@ public synchronized void switchRole(ReplicaRole role, boolean force) throws Exce
}

public void onStopSuccess() throws Exception {
replicaStatistics.reset();
MutativeRoleBroker nextByRole = getNextByRole();
nextByRole.startAfterRoleChange();
setNext(nextByRole);
Expand Down Expand Up @@ -189,7 +192,7 @@ private ReplicaSourceBroker buildSourceBroker(ReplicationMessageProducer replica
}

private ReplicaBroker buildReplicaBroker(ReplicaReplicationQueueSupplier queueProvider) {
return new ReplicaBroker(broker, this, queueProvider, replicaPolicy);
return new ReplicaBroker(broker, this, queueProvider, replicaPolicy, replicaStatistics);
}

private void addInterceptor4CompositeQueues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class ReplicaSequencer {
private final ReplicaAckHelper replicaAckHelper;
private final ReplicaPolicy replicaPolicy;
private final ReplicaBatcher replicaBatcher;
private final ReplicaStatistics replicaStatistics;

ReplicaCompactor replicaCompactor;
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
Expand All @@ -107,14 +108,16 @@ public class ReplicaSequencer {

public ReplicaSequencer(Broker broker, ReplicaReplicationQueueSupplier queueProvider,
ReplicaInternalMessageProducer replicaInternalMessageProducer,
ReplicationMessageProducer replicationMessageProducer, ReplicaPolicy replicaPolicy) {
ReplicationMessageProducer replicationMessageProducer, ReplicaPolicy replicaPolicy,
ReplicaStatistics replicaStatistics) {
this.broker = broker;
this.queueProvider = queueProvider;
this.replicaInternalMessageProducer = replicaInternalMessageProducer;
this.replicationMessageProducer = replicationMessageProducer;
this.replicaAckHelper = new ReplicaAckHelper(broker);
this.replicaPolicy = replicaPolicy;
this.replicaBatcher = new ReplicaBatcher(replicaPolicy);
this.replicaStatistics = replicaStatistics;
}

void initialize() throws Exception {
Expand Down Expand Up @@ -154,7 +157,7 @@ void initialize() throws Exception {
subscription = (PrefetchSubscription) broker.addConsumer(subscriptionConnectionContext, consumerInfo);

replicaCompactor = new ReplicaCompactor(broker, queueProvider, subscription,
replicaPolicy.getCompactorAdditionalMessagesLimit());
replicaPolicy.getCompactorAdditionalMessagesLimit(), replicaStatistics);

intermediateQueue.iterate();
String savedSequences = sequenceStorage.initialize(subscriptionConnectionContext);
Expand Down Expand Up @@ -313,6 +316,7 @@ List<MessageReference> acknowledge(ConsumerBrokerExchange consumerExchange, Mess
}
List<String> messageIds = new ArrayList<>();
List<String> sequenceMessageIds = new ArrayList<>();
long timestamp = messagesToAck.get(0).getMessage().getTimestamp();
for (MessageReference reference : messagesToAck) {
ActiveMQMessage message = (ActiveMQMessage) reference.getMessage();
List<String> messageIdsProperty;
Expand All @@ -323,6 +327,8 @@ List<MessageReference> acknowledge(ConsumerBrokerExchange consumerExchange, Mess
}
messageIds.addAll(messageIdsProperty);
sequenceMessageIds.add(messageIdsProperty.get(0));

timestamp = Math.max(timestamp, message.getTimestamp());
}

broker.acknowledge(consumerExchange, ack);
Expand All @@ -332,6 +338,10 @@ List<MessageReference> acknowledge(ConsumerBrokerExchange consumerExchange, Mess
sequenceMessageIds.forEach(sequenceMessageToAck::addLast);
}

long currentTime = System.currentTimeMillis();
replicaStatistics.setTotalReplicationLag(currentTime - timestamp);
replicaStatistics.setSourceLastProcessedTime(currentTime);

asyncAckWakeup();

return messagesToAck;
Expand Down Expand Up @@ -430,6 +440,8 @@ private void iterateAck0() {

broker.commitTransaction(connectionContext, transactionId, true);

replicaStatistics.increaseTpsCounter(messages.size());

synchronized (messageToAck) {
messageToAck.removeAll(messages);
sequenceMessageToAck.removeAll(sequenceMessages);
Expand Down Expand Up @@ -575,6 +587,7 @@ private BigInteger enqueueReplicaEvent(ConnectionContext connectionContext, List

List<String> messageIds = new ArrayList<>();
List<DataStructure> messages = new ArrayList<>();
long timestamp = batch.get(0).getMessage().getTimestamp();
for (MessageReference reference : batch) {
ActiveMQMessage originalMessage = (ActiveMQMessage) reference.getMessage();
ActiveMQMessage message = (ActiveMQMessage) originalMessage.copy();
Expand All @@ -589,12 +602,16 @@ private BigInteger enqueueReplicaEvent(ConnectionContext connectionContext, List
messages.add(message);

sequence = sequence.add(BigInteger.ONE);

// take timestamp from the newest message for statistics
timestamp = Math.max(timestamp, message.getTimestamp());
}

ReplicaEvent replicaEvent = new ReplicaEvent()
.setEventType(ReplicaEventType.BATCH)
.setEventData(eventSerializer.serializeListOfObjects(messages))
.setTransactionId(transactionId)
.setTimestamp(timestamp)
.setReplicationProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, messageIds);

replicationMessageProducer.enqueueMainReplicaEvent(connectionContext, replicaEvent);
Expand Down
Loading

0 comments on commit b5b3975

Please sign in to comment.