Skip to content

Commit 45197ba

Browse files
authored
Producer- report a monotonically increasing delta chain version count (starting from 1) in blob header and metrics (#708)
* Producer- write monotonically increasing delta chain version counter * Producer- report metric for delta chain version counter
1 parent 85f7038 commit 45197ba

File tree

7 files changed

+55
-1
lines changed

7 files changed

+55
-1
lines changed

hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java

+15
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.netflix.hollow.api.producer;
1818

1919
import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
20+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
2021
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
2122
import static java.lang.System.currentTimeMillis;
2223
import static java.util.stream.Collectors.toList;
@@ -524,6 +525,20 @@ private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion
524525
}
525526
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
526527
writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED);
528+
529+
long prevDeltaChainVersionCounter = 0l;
530+
if (readStates.hasCurrent()) {
531+
String str = readStates.current().getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER);
532+
if (str != null) {
533+
try {
534+
prevDeltaChainVersionCounter = Long.valueOf(str);
535+
} catch (NumberFormatException e) {
536+
// ignore, prevDeltaChainVersionCounter remains 0
537+
}
538+
}
539+
}
540+
long deltaChainVersionCounter = prevDeltaChainVersionCounter + 1;
541+
writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(deltaChainVersionCounter));
527542
}
528543

529544
void populate(

hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java

+10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.netflix.hollow.api.producer.AbstractHollowProducerListener;
2020
import com.netflix.hollow.api.producer.HollowProducer;
21+
import com.netflix.hollow.core.HollowStateEngine;
2122
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
2223
import java.time.Duration;
2324
import java.util.Map;
@@ -99,6 +100,15 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status
99100
.setAnnouncementDurationMillis(elapsed.toMillis());
100101
lastAnnouncementSuccessTimeNanoOptional.ifPresent(announcementMetricsBuilder::setLastAnnouncementSuccessTimeNano);
101102

103+
if (stateEngine.getHeaderTag(HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER) != null) {
104+
try {
105+
long deltaChainVersionCounter = Long.parseLong(stateEngine.getHeaderTag(HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
106+
announcementMetricsBuilder.setDeltaChainVersionCounter(deltaChainVersionCounter);
107+
} catch (NumberFormatException e) {
108+
// ignore
109+
}
110+
}
111+
102112
announcementMetricsReporting(announcementMetricsBuilder.build());
103113
}
104114

hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AnnouncementMetrics.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class AnnouncementMetrics {
2727
private long announcementDurationMillis; // Announcement duration in ms, only applicable to completed cycles (skipped cycles dont announce)
2828
private boolean isAnnouncementSuccess; // true if announcement was successful, false if announcement failed
2929
private OptionalLong lastAnnouncementSuccessTimeNano; // monotonic time of last successful announcement (no relation to wall clock), N/A until first successful announcement
30-
30+
private OptionalLong deltaChainVersionCounter;
3131

3232
public long getDataSizeBytes() {
3333
return dataSizeBytes;
@@ -47,6 +47,10 @@ public boolean getIsAnnouncementSuccess() {
4747
public OptionalLong getLastAnnouncementSuccessTimeNano() {
4848
return lastAnnouncementSuccessTimeNano;
4949
}
50+
public OptionalLong getDeltaChainVersionCounter() {
51+
return deltaChainVersionCounter;
52+
}
53+
5054

5155
private AnnouncementMetrics(Builder builder) {
5256
this.dataSizeBytes = builder.dataSizeBytes;
@@ -55,18 +59,21 @@ private AnnouncementMetrics(Builder builder) {
5559
this.announcementDurationMillis = builder.announcementDurationMillis;
5660
this.isAnnouncementSuccess = builder.isAnnouncementSuccess;
5761
this.lastAnnouncementSuccessTimeNano = builder.lastAnnouncementSuccessTimeNano;
62+
this.deltaChainVersionCounter = builder.deltaChainVersionCounter;
5863
}
5964

6065
public static final class Builder {
6166
private long dataSizeBytes;
6267
private long announcementDurationMillis;
6368
private boolean isAnnouncementSuccess;
6469
private OptionalLong lastAnnouncementSuccessTimeNano;
70+
private OptionalLong deltaChainVersionCounter;
6571
private Map<String, Integer> numShardsPerType;
6672
private Map<String, Long> shardSizePerType;
6773

6874
public Builder() {
6975
lastAnnouncementSuccessTimeNano = OptionalLong.empty();
76+
deltaChainVersionCounter = OptionalLong.empty();
7077
}
7178

7279
public Builder setDataSizeBytes(long dataSizeBytes) {
@@ -93,6 +100,10 @@ public Builder setLastAnnouncementSuccessTimeNano(long lastAnnouncementSuccessTi
93100
this.lastAnnouncementSuccessTimeNano = OptionalLong.of(lastAnnouncementSuccessTimeNano);
94101
return this;
95102
}
103+
public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) {
104+
this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter);
105+
return this;
106+
}
96107

97108
public AnnouncementMetrics build() {
98109
return new AnnouncementMetrics(this);

hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java

+5
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public interface HollowStateEngine extends HollowDataset {
7575
*/
7676
String HEADER_TAG_PRODUCER_TO_VERSION = "hollow.blob.to.version";
7777

78+
/**
79+
* A header tag indicating monotonically increasing version in the same delta chain
80+
*/
81+
String HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER = "hollow.delta.chain.version.counter";
82+
7883
@Override
7984
List<HollowSchema> getSchemas();
8085

hollow/src/test/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListenerTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.netflix.hollow.api.producer.metrics;
22

3+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
4+
import static org.mockito.ArgumentMatchers.eq;
35
import static org.mockito.Mockito.when;
46

57
import com.netflix.hollow.api.producer.HollowProducer;
@@ -114,6 +116,7 @@ public void cycleMetricsReporting(CycleMetrics cycleMetrics) {
114116

115117
@Test
116118
public void testAnnouncementCompleteWithSuccess() {
119+
when(mockStateEngine.getHeaderTag(eq(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER))).thenReturn("1");
117120
final class TestProducerMetricsListener extends AbstractProducerMetricsListener {
118121
@Override
119122
public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics) {
@@ -124,6 +127,8 @@ public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics
124127
announcementMetrics.getAnnouncementDurationMillis());
125128
Assert.assertNotEquals(OptionalLong.of(TEST_LAST_ANNOUNCEMENT_NANOS),
126129
announcementMetrics.getLastAnnouncementSuccessTimeNano());
130+
Assert.assertEquals(OptionalLong.of(1l),
131+
announcementMetrics.getDeltaChainVersionCounter());
127132
}
128133
}
129134

@@ -146,6 +151,8 @@ public void announcementMetricsReporting(AnnouncementMetrics announcementMetrics
146151
announcementMetrics.getAnnouncementDurationMillis());
147152
Assert.assertEquals(OptionalLong.of(TEST_LAST_ANNOUNCEMENT_NANOS),
148153
announcementMetrics.getLastAnnouncementSuccessTimeNano());
154+
Assert.assertEquals(OptionalLong.empty(),
155+
announcementMetrics.getDeltaChainVersionCounter());
149156
}
150157
}
151158

hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package com.netflix.hollow.core.util;
1818

19+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
1920
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;
2021
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
2122
import static org.junit.Assert.assertEquals;
@@ -46,6 +47,7 @@ public void recreatesUsingReadEngine() throws IOException {
4647
}
4748
writeEngine.addHeaderTag("CopyTag", "copied");
4849
writeEngine.addHeaderTag(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis()));
50+
writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, "1");
4951
String toVersion = String.valueOf(System.currentTimeMillis());
5052
writeEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, toVersion);
5153

@@ -55,6 +57,7 @@ public void recreatesUsingReadEngine() throws IOException {
5557
HollowWriteStateEngine recreatedWriteEngine = HollowWriteStateCreator.recreateAndPopulateUsingReadEngine(readEngine);
5658
assertEquals(cycleStartTime, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_METRIC_CYCLE_START));
5759
assertEquals(readEngineToVersion, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION));
60+
assertEquals("1", recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
5861
assertEquals(8, recreatedWriteEngine.getTypeState("Integer").getNumShards());
5962

6063
HollowReadStateEngine recreatedReadEngine = StateEngineRoundTripper.roundTripSnapshot(recreatedWriteEngine);

hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.netflix.hollow.core.write;
22

3+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
34
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
45
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
56
import static org.junit.Assert.assertEquals;
@@ -61,10 +62,12 @@ public void testHeaderTagsOnDeltaAndReverseDelta() {
6162
consumer.triggerRefreshTo(version3); // delta transition
6263
assertEquals("3", consumer.getStateEngine().getHeaderTag(TEST_TAG));
6364
assertEquals(String.valueOf(version3), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION));
65+
assertEquals("3", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
6466

6567
consumer.triggerRefreshTo(version2); // reverse delta transition
6668
assertEquals("2", consumer.getStateEngine().getHeaderTag(TEST_TAG));
6769
assertEquals(String.valueOf(version2), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION));
70+
assertEquals("2", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
6871

6972
}
7073

0 commit comments

Comments
 (0)