Skip to content

Commit

Permalink
fix conflict.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed May 14, 2024
1 parent 854a056 commit 677ed2f
Showing 1 changed file with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
Expand Down Expand Up @@ -90,7 +91,9 @@ public class Consumer {
private final Rate msgOut;
private final Rate msgRedeliver;
private final LongAdder msgOutCounter;
private final LongAdder msgRedeliverCounter;
private final LongAdder bytesOutCounter;
private final LongAdder messageAckCounter;
private final Rate messageAckRate;

private volatile long lastConsumedTimestamp;
Expand Down Expand Up @@ -152,6 +155,9 @@ public class Consumer {
@Getter
private final SchemaType schemaType;

@Getter
private final Instant connectedSince = Instant.now();

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Expand Down Expand Up @@ -182,8 +188,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.msgOut = new Rate();
this.chunkedMessageRate = new Rate();
this.msgRedeliver = new Rate();
this.msgRedeliverCounter = new LongAdder();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.messageAckCounter = new LongAdder();
this.messageAckRate = new Rate();
this.appId = appId;

Expand All @@ -201,7 +209,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats.setAddress(cnx.clientSourceAddressAndPort());
stats.consumerName = consumerName;
stats.appId = appId;
stats.setConnectedSince(DateFormatter.now());
stats.setConnectedSince(DateFormatter.format(connectedSince));
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;

Expand Down Expand Up @@ -239,8 +247,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.consumerName = consumerName;
this.msgOut = null;
this.msgRedeliver = null;
this.msgRedeliverCounter = null;
this.msgOutCounter = null;
this.bytesOutCounter = null;
this.messageAckCounter = null;
this.messageAckRate = null;
this.pendingAcks = null;
this.stats = null;
Expand Down Expand Up @@ -503,6 +513,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
return future
.thenApply(v -> {
this.messageAckRate.recordEvent(v);
this.messageAckCounter.add(v);
return null;
});
}
Expand Down Expand Up @@ -923,6 +934,14 @@ public long getBytesOutCounter() {
return bytesOutCounter.longValue();
}

public long getMessageAckCounter() {
return messageAckCounter.sum();
}

public long getMessageRedeliverCounter() {
return msgRedeliverCounter.sum();
}

public int getUnackedMessages() {
return unackedMessages;
}
Expand All @@ -935,7 +954,7 @@ public KeySharedMeta getKeySharedMeta() {
public String toString() {
if (subscription != null && cnx != null) {
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
.add("consumerName", consumerName).add("address", this.cnx.toString()).toString();
} else {
return MoreObjects.toStringHelper(this).add("consumerId", consumerId)
.add("consumerName", consumerName).toString();
Expand Down Expand Up @@ -1060,6 +1079,8 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) {
}

msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue());
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
} else {
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
Expand Down Expand Up @@ -1092,6 +1113,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
msgRedeliverCounter.add(totalRedeliveryMessages);

int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);

Expand Down Expand Up @@ -1154,6 +1176,14 @@ public String getClientAddress() {
return clientAddress;
}

public String getClientAddressAndPort() {
return cnx.clientSourceAddressAndPort();
}

public String getClientVersion() {
return cnx.getClientVersion();
}

public MessageId getStartMessageId() {
return startMessageId;
}
Expand Down

0 comments on commit 677ed2f

Please sign in to comment.