Skip to content

Commit ff6fa9e

Browse files
committed
Consumer- report delta chain version counter
1 parent 028b616 commit ff6fa9e

File tree

3 files changed

+54
-15
lines changed

3 files changed

+54
-15
lines changed

hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java

+24-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.netflix.hollow.api.consumer.metrics;
1818

1919
import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
20+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
2021
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT;
2122
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;
2223

@@ -59,12 +60,15 @@ public abstract class AbstractRefreshMetricsListener extends AbstractRefreshList
5960
private final Map<Long, Long> announcementTimestamps;
6061
private volatile boolean namespacePinnedPreviously;
6162

63+
private final Map<Long, Long> cycleVersionDeltaCounters; // delta chain version counter for each cycle version
64+
6265
public AbstractRefreshMetricsListener() {
6366
lastRefreshTimeNanoOptional = OptionalLong.empty();
6467
consecutiveFailures = 0l;
6568
cycleVersionStartTimes = new HashMap<>();
6669
announcementTimestamps = new HashMap<>();
6770
namespacePinnedPreviously = false;
71+
cycleVersionDeltaCounters = new HashMap<>();
6872
}
6973

7074
public void refreshStarted(long currentVersion, long requestedVersion) {
@@ -73,7 +77,9 @@ public void refreshStarted(long currentVersion, long requestedVersion) {
7377
refreshMetricsBuilder = new ConsumerRefreshMetrics.Builder();
7478
refreshMetricsBuilder.setIsInitialLoad(currentVersion == VERSION_NONE);
7579
refreshMetricsBuilder.setUpdatePlanDetails(updatePlanDetails);
76-
cycleVersionStartTimes.clear(); // clear map to avoid accumulation over time
80+
// clear maps to avoid accumulation over time
81+
cycleVersionStartTimes.clear();
82+
cycleVersionDeltaCounters.clear();
7783
}
7884

7985
@Override
@@ -91,7 +97,7 @@ public void versionDetected(HollowConsumer.VersionInfo requestedVersionInfo) {
9197
// or for the newVersion). Don't record this metric when a namespace was pinned previously and gets unpinned
9298
// in the next cycle because this metric will record the refresh duration from the latest announced version.
9399
if (!(namespacePinnedPreviously || isPinned)) {
94-
trackTimestampsFromHeaders(requestedVersionInfo.getVersion(),
100+
trackHeaderTagInVersion(requestedVersionInfo.getVersion(),
95101
requestedVersionInfo.getAnnouncementMetadata().get(), HEADER_TAG_METRIC_ANNOUNCEMENT, announcementTimestamps);
96102
}
97103
namespacePinnedPreviously = isPinned;
@@ -160,6 +166,9 @@ public void refreshSuccessful(long beforeVersion, long afterVersion, long reques
160166
if (cycleVersionStartTimes.containsKey(afterVersion)) {
161167
refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion));
162168
}
169+
if (cycleVersionDeltaCounters.containsKey(afterVersion)) {
170+
refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion));
171+
}
163172

164173
if (afterVersion == requestedVersion && announcementTimestamps.containsKey(afterVersion)) {
165174
refreshMetricsBuilder.setAnnouncementTimestamp(announcementTimestamps.get(afterVersion));
@@ -186,32 +195,37 @@ public void refreshFailed(long beforeVersion, long afterVersion, long requestedV
186195
if (cycleVersionStartTimes.containsKey(afterVersion)) {
187196
refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion));
188197
}
198+
if (cycleVersionDeltaCounters.containsKey(afterVersion)) {
199+
refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion));
200+
}
189201

190202
noFailRefreshEndMetricsReporting(refreshMetricsBuilder.build());
191203
}
192204

193205
@Override
194206
public void snapshotUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) {
195-
trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
207+
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
208+
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters);
196209
}
197210

198211
@Override
199212
public void deltaUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) {
200-
trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
213+
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
214+
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters);
201215
}
202216

203217
/**
204-
* If the blob header contains the timestamps like producer cycle start and announcement then save those values in
205-
* the maps tracking version to cycle start time and version to announcement respectively.
218+
* If the blob header contains a value for the given header tag (like producer cycle start time) then save that value in
219+
* a maps tracking the value per version in this refresh.
206220
*/
207-
private void trackTimestampsFromHeaders(long version, Map<String, String> headers, String headerTag, Map<Long, Long> timestampsMap) {
221+
private void trackHeaderTagInVersion(long version, Map<String, String> headers, String headerTag, Map<Long, Long> tracker) {
208222
if (headers != null) {
209223
String headerTagValue = headers.get(headerTag);
210224
if (headerTagValue != null && !headerTagValue.isEmpty()) {
211225
try {
212-
Long timestamp = Long.valueOf(headerTagValue);
213-
if (timestamp != null) {
214-
timestampsMap.put(version, timestamp);
226+
Long val = Long.valueOf(headerTagValue);
227+
if (val != null) {
228+
tracker.put(version, val);
215229
}
216230
} catch (NumberFormatException e) {
217231
log.log(Level.WARNING, "Blob header contained " + headerTag + " but its value could"

hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ConsumerRefreshMetrics {
3232
private long refreshEndTimeNano; // monotonic system time when refresh ended
3333
private OptionalLong cycleStartTimestamp; // timestamp in millis of when cycle started for the loaded data version
3434
private OptionalLong announcementTimestamp; // timestamp in milliseconds to mark announcement for the loaded data version
35+
private OptionalLong deltaChainVersionCounter; // the sequence number of a version in a delta chain
3536

3637
/**
3738
* A class that contains details of the consumer refresh update plan that may be useful to report as metrics or logs.
@@ -84,8 +85,12 @@ public long getRefreshEndTimeNano() {
8485
public OptionalLong getCycleStartTimestamp() {
8586
return cycleStartTimestamp;
8687
}
87-
88-
public OptionalLong getAnnouncementTimestamp() { return announcementTimestamp; }
88+
public OptionalLong getAnnouncementTimestamp() {
89+
return announcementTimestamp;
90+
}
91+
public OptionalLong getDeltaChainVersionCounter() {
92+
return deltaChainVersionCounter;
93+
}
8994

9095
private ConsumerRefreshMetrics(Builder builder) {
9196
this.durationMillis = builder.durationMillis;
@@ -98,6 +103,7 @@ private ConsumerRefreshMetrics(Builder builder) {
98103
this.refreshEndTimeNano = builder.refreshEndTimeNano;
99104
this.cycleStartTimestamp = builder.cycleStartTimestamp;
100105
this.announcementTimestamp = builder.announcementTimestamp;
106+
this.deltaChainVersionCounter = builder.deltaChainVersionCounter;
101107
}
102108

103109
public static final class Builder {
@@ -111,11 +117,13 @@ public static final class Builder {
111117
private long refreshEndTimeNano;
112118
private OptionalLong cycleStartTimestamp;
113119
private OptionalLong announcementTimestamp;
120+
private OptionalLong deltaChainVersionCounter;
114121

115122
public Builder() {
116123
refreshSuccessAgeMillisOptional = OptionalLong.empty();
117124
cycleStartTimestamp = OptionalLong.empty();
118125
announcementTimestamp = OptionalLong.empty();
126+
deltaChainVersionCounter = OptionalLong.empty();
119127
}
120128

121129
public Builder setDurationMillis(long durationMillis) {
@@ -160,6 +168,10 @@ public Builder setAnnouncementTimestamp(long announcementTimestamp) {
160168
this.announcementTimestamp = OptionalLong.of(announcementTimestamp);
161169
return this;
162170
}
171+
public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) {
172+
this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter);
173+
return this;
174+
}
163175

164176
public ConsumerRefreshMetrics build() {
165177
return new ConsumerRefreshMetrics(this);

hollow/src/test/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListenerTest.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.netflix.hollow.api.consumer.metrics;
22

33
import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
4+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
45
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT;
56
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;
67
import static org.junit.Assert.assertEquals;
@@ -11,13 +12,12 @@
1112
import com.netflix.hollow.api.producer.HollowProducer;
1213
import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager;
1314
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
15+
import com.netflix.hollow.test.InMemoryBlobStore;
1416
import java.util.ArrayList;
1517
import java.util.HashMap;
1618
import java.util.List;
1719
import java.util.Map;
1820
import java.util.Optional;
19-
20-
import com.netflix.hollow.test.InMemoryBlobStore;
2121
import org.junit.Assert;
2222
import org.junit.Before;
2323
import org.junit.Test;
@@ -129,13 +129,15 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
129129
Assert.assertNotEquals(0l, refreshMetrics.getRefreshEndTimeNano());
130130
assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong());
131131
assertEquals(TEST_ANNOUNCEMENT_TIMESTAMP, refreshMetrics.getAnnouncementTimestamp().getAsLong());
132+
assertEquals(1l, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
132133
}
133134
}
134135
SuccessTestRefreshMetricsListener successTestRefreshMetricsListener = new SuccessTestRefreshMetricsListener();
135136
successTestRefreshMetricsListener.refreshStarted(TEST_VERSION_LOW, TEST_VERSION_HIGH);
136137

137138
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP));
138139
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP));
140+
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l));
139141
successTestRefreshMetricsListener.snapshotUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH);
140142

141143
successTestRefreshMetricsListener.refreshSuccessful(TEST_VERSION_LOW, TEST_VERSION_HIGH, TEST_VERSION_HIGH);
@@ -152,6 +154,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
152154
Assert.assertNotEquals(0l, refreshMetrics.getRefreshEndTimeNano());
153155
Assert.assertFalse(refreshMetrics.getCycleStartTimestamp().isPresent());
154156
Assert.assertFalse(refreshMetrics.getAnnouncementTimestamp().isPresent());
157+
Assert.assertFalse(refreshMetrics.getDeltaChainVersionCounter().isPresent());
155158
}
156159
}
157160
FailureTestRefreshMetricsListener failTestRefreshMetricsListener = new FailureTestRefreshMetricsListener();
@@ -168,6 +171,7 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
168171
assertEquals(3, refreshMetrics.getUpdatePlanDetails().getNumSuccessfulTransitions());
169172
assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong());
170173
assertEquals(TEST_ANNOUNCEMENT_TIMESTAMP, refreshMetrics.getAnnouncementTimestamp().getAsLong());
174+
assertEquals(3l, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
171175
}
172176
}
173177
List<HollowConsumer.Blob.BlobType> testTransitionSequence = new ArrayList<HollowConsumer.Blob.BlobType>() {{
@@ -183,16 +187,19 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
183187
successTestRefreshMetricsListener.blobLoaded(null);
184188
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP-2));
185189
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP-2));
190+
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l));
186191
successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH-2);
187192

188193
successTestRefreshMetricsListener.blobLoaded(null);
189194
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP-1));
190195
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP-1));
196+
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(2l));
191197
successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH-1);
192198

193199
successTestRefreshMetricsListener.blobLoaded(null);
194200
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP));
195201
testHeaderTags.put(HEADER_TAG_METRIC_ANNOUNCEMENT, String.valueOf(TEST_ANNOUNCEMENT_TIMESTAMP));
202+
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(3l));
196203
successTestRefreshMetricsListener.deltaUpdateOccurred(null, mockStateEngine, TEST_VERSION_HIGH);
197204

198205
successTestRefreshMetricsListener.refreshSuccessful(TEST_VERSION_LOW, TEST_VERSION_HIGH, TEST_VERSION_HIGH);
@@ -205,6 +212,7 @@ class FailureTestRefreshMetricsListener extends AbstractRefreshMetricsListener {
205212
public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
206213
assertEquals(1, refreshMetrics.getUpdatePlanDetails().getNumSuccessfulTransitions());
207214
assertEquals(TEST_CYCLE_START_TIMESTAMP, refreshMetrics.getCycleStartTimestamp().getAsLong());
215+
assertEquals(1l, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
208216
}
209217
}
210218
List<HollowConsumer.Blob.BlobType> testTransitionSequence = new ArrayList<HollowConsumer.Blob.BlobType>() {{
@@ -219,14 +227,15 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
219227

220228
failureTestRefreshMetricsListener.blobLoaded(null);
221229
testHeaderTags.put(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(TEST_CYCLE_START_TIMESTAMP));
230+
testHeaderTags.put(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(1l));
222231
failureTestRefreshMetricsListener.snapshotUpdateOccurred(null, mockStateEngine, TEST_VERSION_LOW);
223232

224233
failureTestRefreshMetricsListener.refreshFailed(TEST_VERSION_LOW-1, TEST_VERSION_LOW, TEST_VERSION_HIGH, null);
225234

226235
}
227236

228237
@Test
229-
public void testCycleStart() { // also exercises reverse delta transition
238+
public void testCycles() { // also exercises reverse delta transition
230239
InMemoryBlobStore blobStore = new InMemoryBlobStore();
231240
HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager();
232241
HollowProducer p = HollowProducer
@@ -252,16 +261,20 @@ public void refreshEndMetricsReporting(ConsumerRefreshMetrics refreshMetrics) {
252261
switch (run) {
253262
case 1:
254263
assertEquals(1L, refreshMetrics.getCycleStartTimestamp().getAsLong());
264+
assertEquals(1L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
255265
break;
256266
case 2:
257267
case 5:
258268
assertEquals(2L, refreshMetrics.getCycleStartTimestamp().getAsLong());
269+
assertEquals(2L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
259270
break;
260271
case 3:
261272
assertEquals(1L, refreshMetrics.getCycleStartTimestamp().getAsLong());
273+
assertEquals(1L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
262274
break;
263275
case 4:
264276
assertEquals(3L, refreshMetrics.getCycleStartTimestamp().getAsLong());
277+
assertEquals(3L, refreshMetrics.getDeltaChainVersionCounter().getAsLong());
265278
break;
266279
}
267280
}

0 commit comments

Comments
 (0)