Skip to content

Commit e789579

Browse files
committed
Resharding: producer can toggle numShards for Object types in the course of a delta chain
1 parent c2920b7 commit e789579

19 files changed

+1126
-179
lines changed

hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java

+28-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.netflix.hollow.api.producer;
1818

1919
import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
20+
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
2021
import static java.lang.System.currentTimeMillis;
2122
import static java.util.stream.Collectors.toList;
2223

@@ -92,14 +93,19 @@ abstract class AbstractHollowProducer {
9293

9394
boolean isInitialized;
9495

96+
private final long targetMaxTypeShardSize;
97+
private final boolean allowTypeResharding;
98+
private final boolean focusHoleFillInFewestShards;
99+
100+
95101
@Deprecated
96102
public AbstractHollowProducer(
97103
HollowProducer.Publisher publisher,
98104
HollowProducer.Announcer announcer) {
99105
this(new HollowFilesystemBlobStager(), publisher, announcer,
100106
Collections.emptyList(),
101107
new VersionMinterWithCounter(), null, 0,
102-
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, null,
108+
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, false, null,
103109
new DummyBlobStorageCleaner(), new BasicSingleProducerEnforcer(),
104110
null, true);
105111
}
@@ -111,7 +117,7 @@ public AbstractHollowProducer(
111117
this(b.stager, b.publisher, b.announcer,
112118
b.eventListeners,
113119
b.versionMinter, b.snapshotPublishExecutor,
114-
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards,
120+
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards, b.allowTypeResharding,
115121
b.metricsCollector, b.blobStorageCleaner, b.singleProducerEnforcer,
116122
b.hashCodeFinder, b.doIntegrityCheck);
117123
}
@@ -126,6 +132,7 @@ private AbstractHollowProducer(
126132
int numStatesBetweenSnapshots,
127133
long targetMaxTypeShardSize,
128134
boolean focusHoleFillInFewestShards,
135+
boolean allowTypeResharding,
129136
HollowMetricsCollector<HollowProducerMetrics> metricsCollector,
130137
HollowProducer.BlobStorageCleaner blobStorageCleaner,
131138
SingleProducerEnforcer singleProducerEnforcer,
@@ -140,11 +147,15 @@ private AbstractHollowProducer(
140147
this.numStatesBetweenSnapshots = numStatesBetweenSnapshots;
141148
this.hashCodeFinder = hashCodeFinder;
142149
this.doIntegrityCheck = doIntegrityCheck;
150+
this.targetMaxTypeShardSize = targetMaxTypeShardSize;
151+
this.allowTypeResharding = allowTypeResharding;
152+
this.focusHoleFillInFewestShards = focusHoleFillInFewestShards;
143153

144154
HollowWriteStateEngine writeEngine = hashCodeFinder == null
145155
? new HollowWriteStateEngine()
146156
: new HollowWriteStateEngine(hashCodeFinder);
147157
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
158+
writeEngine.allowTypeResharding(allowTypeResharding);
148159
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
149160

150161
this.objectMapper = new HollowObjectMapper(writeEngine);
@@ -283,6 +294,9 @@ private HollowProducer.ReadState restore(
283294
HollowWriteStateEngine writeEngine = hashCodeFinder == null
284295
? new HollowWriteStateEngine()
285296
: new HollowWriteStateEngine(hashCodeFinder);
297+
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
298+
writeEngine.allowTypeResharding(allowTypeResharding);
299+
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
286300
HollowWriteStateCreator.populateStateEngineWithTypeWriteStates(writeEngine, schemas);
287301
HollowObjectMapper newObjectMapper = new HollowObjectMapper(writeEngine);
288302
if (hashCodeFinder != null) {
@@ -380,15 +394,9 @@ long runCycle(
380394

381395
// 3. Produce a new state if there's work to do
382396
if (writeEngine.hasChangedSinceLastCycle()) {
383-
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash());
384397
boolean schemaChangedFromPriorVersion = readStates.hasCurrent() &&
385398
!writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine());
386-
if (schemaChangedFromPriorVersion) {
387-
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
388-
} else {
389-
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
390-
}
391-
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
399+
updateHeaderTags(writeEngine, toVersion, schemaChangedFromPriorVersion);
392400

393401
// 3a. Publish, run checks & validation, then announce new state consumers
394402
publish(listeners, toVersion, artifacts);
@@ -507,6 +515,17 @@ public void removeListener(HollowProducerEventListener listener) {
507515
listeners.removeListener(listener);
508516
}
509517

518+
private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion, boolean schemaChangedFromPriorVersion) {
519+
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash());
520+
if (schemaChangedFromPriorVersion) {
521+
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
522+
} else {
523+
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
524+
}
525+
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
526+
writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED);
527+
}
528+
510529
void populate(
511530
ProducerListeners listeners,
512531
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,

hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java

+29
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,7 @@ public static class Builder<B extends HollowProducer.Builder<B>> {
720720
Executor snapshotPublishExecutor = null;
721721
int numStatesBetweenSnapshots = 0;
722722
boolean focusHoleFillInFewestShards = false;
723+
boolean allowTypeResharding = false;
723724
long targetMaxTypeShardSize = DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE;
724725
HollowMetricsCollector<HollowProducerMetrics> metricsCollector;
725726
BlobStorageCleaner blobStorageCleaner = new DummyBlobStorageCleaner();
@@ -850,11 +851,31 @@ public B withTargetMaxTypeShardSize(long targetMaxTypeShardSize) {
850851
return (B) this;
851852
}
852853

854+
/**
855+
* Experimental: Setting this will focus the holes returned by the FreeOrdinalTracker for each state into as few shards as possible.
856+
*
857+
* This can be used by the consumers to reduce the work necessary to apply a delta, by skipping recreation of shards where no records are added.
858+
*/
853859
public B withFocusHoleFillInFewestShards(boolean focusHoleFillInFewestShards) {
854860
this.focusHoleFillInFewestShards = focusHoleFillInFewestShards;
855861
return (B) this;
856862
}
857863

864+
/**
865+
* Experimental: Setting this will allow producer to adjust number of shards per type in the course of a delta chain.
866+
*
867+
* Consumer-side delta transitions work by making a copy of one shard at a time, so the ability to accommodate more
868+
* data in a type by growing the number of shards instead of the size of shards leads to means consumers can apply
869+
* delta transitions with a memory overhead (equal to the configured max shard size).
870+
*
871+
* Requires integrity check to be enabled, and honors numShards pinned using annotation in data model.
872+
* Also requires consumers to be on a recent Hollow library version that supports re-sharding at the time of delta application.
873+
*/
874+
public B withTypeResharding(boolean allowTypeResharding) {
875+
this.allowTypeResharding = allowTypeResharding;
876+
return (B) this;
877+
}
878+
858879
public B withMetricsCollector(HollowMetricsCollector<HollowProducerMetrics> metricsCollector) {
859880
this.metricsCollector = metricsCollector;
860881
return (B) this;
@@ -887,6 +908,14 @@ public B noIntegrityCheck() {
887908
}
888909

889910
protected void checkArguments() {
911+
if (allowTypeResharding == true && doIntegrityCheck == false) { // type resharding feature rollout
912+
throw new IllegalArgumentException("Enabling type re-sharding requires integrity check to also be enabled");
913+
}
914+
if (allowTypeResharding == true && focusHoleFillInFewestShards == true) { // type re-sharding feature rollout
915+
// More thorough testing required before enabling these features to work in tandem
916+
// simple test case for when features are allowed to work together passes, see {@code testReshardingWithFocusHoleFillInFewestShards}
917+
throw new IllegalArgumentException("Producer does not yet support using both re-sharding and focusHoleFillInFewestShards features in tandem");
918+
}
890919
if (stager != null && compressor != null) {
891920
throw new IllegalArgumentException(
892921
"Both a custom BlobStager and BlobCompressor were specified -- please specify only one of these.");

hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AbstractProducerMetricsListener.java

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.netflix.hollow.api.producer.HollowProducer;
2121
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
2222
import java.time.Duration;
23+
import java.util.Map;
2324
import java.util.OptionalLong;
2425

2526
/**
@@ -87,9 +88,13 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status
8788

8889
HollowReadStateEngine stateEngine = readState.getStateEngine();
8990
dataSizeBytes = stateEngine.calcApproxDataSize();
91+
Map<String, Integer> numShardsPerType = stateEngine.numShardsPerType();
92+
Map<String, Long> shardSizePerType = stateEngine.calcApproxShardSizePerType();
9093

9194
announcementMetricsBuilder
9295
.setDataSizeBytes(dataSizeBytes)
96+
.setNumShardsPerType(numShardsPerType)
97+
.setShardSizePerType(shardSizePerType)
9398
.setIsAnnouncementSuccess(isAnnouncementSuccess)
9499
.setAnnouncementDurationMillis(elapsed.toMillis());
95100
lastAnnouncementSuccessTimeNanoOptional.ifPresent(announcementMetricsBuilder::setLastAnnouncementSuccessTimeNano);

hollow/src/main/java/com/netflix/hollow/api/producer/metrics/AnnouncementMetrics.java

+21
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
*/
1717
package com.netflix.hollow.api.producer.metrics;
1818

19+
import java.util.Map;
1920
import java.util.OptionalLong;
2021

2122
public class AnnouncementMetrics {
2223

2324
private long dataSizeBytes; // Heap footprint of announced blob in bytes
25+
private Map<String, Integer> numShardsPerType;
26+
private Map<String, Long> shardSizePerType;
2427
private long announcementDurationMillis; // Announcement duration in ms, only applicable to completed cycles (skipped cycles dont announce)
2528
private boolean isAnnouncementSuccess; // true if announcement was successful, false if announcement failed
2629
private OptionalLong lastAnnouncementSuccessTimeNano; // monotonic time of last successful announcement (no relation to wall clock), N/A until first successful announcement
@@ -29,6 +32,12 @@ public class AnnouncementMetrics {
2932
public long getDataSizeBytes() {
3033
return dataSizeBytes;
3134
}
35+
public Map<String, Integer> getNumShardsPerType() {
36+
return numShardsPerType;
37+
}
38+
public Map<String, Long> getShardSizePerType() {
39+
return shardSizePerType;
40+
}
3241
public long getAnnouncementDurationMillis() {
3342
return announcementDurationMillis;
3443
}
@@ -41,6 +50,8 @@ public OptionalLong getLastAnnouncementSuccessTimeNano() {
4150

4251
private AnnouncementMetrics(Builder builder) {
4352
this.dataSizeBytes = builder.dataSizeBytes;
53+
this.numShardsPerType = builder.numShardsPerType;
54+
this.shardSizePerType = builder.shardSizePerType;
4455
this.announcementDurationMillis = builder.announcementDurationMillis;
4556
this.isAnnouncementSuccess = builder.isAnnouncementSuccess;
4657
this.lastAnnouncementSuccessTimeNano = builder.lastAnnouncementSuccessTimeNano;
@@ -51,6 +62,8 @@ public static final class Builder {
5162
private long announcementDurationMillis;
5263
private boolean isAnnouncementSuccess;
5364
private OptionalLong lastAnnouncementSuccessTimeNano;
65+
private Map<String, Integer> numShardsPerType;
66+
private Map<String, Long> shardSizePerType;
5467

5568
public Builder() {
5669
lastAnnouncementSuccessTimeNano = OptionalLong.empty();
@@ -60,6 +73,14 @@ public Builder setDataSizeBytes(long dataSizeBytes) {
6073
this.dataSizeBytes = dataSizeBytes;
6174
return this;
6275
}
76+
public Builder setNumShardsPerType(Map<String, Integer> numShardsPerType) {
77+
this.numShardsPerType = numShardsPerType;
78+
return this;
79+
}
80+
public Builder setShardSizePerType(Map<String, Long> shardSizePerType) {
81+
this.shardSizePerType = shardSizePerType;
82+
return this;
83+
}
6384
public Builder setAnnouncementDurationMillis(long announcementDurationMillis) {
6485
this.announcementDurationMillis = announcementDurationMillis;
6586
return this;

hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java

+7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ public interface HollowStateEngine extends HollowDataset {
4747
*/
4848
String HEADER_TAG_SCHEMA_CHANGE = "hollow.schema.changedFromPriorVersion";
4949

50+
/**
51+
* A header tag indicating that num shards for a type has changed since the prior version. Its value encodes
52+
* the type(s) that were re-sharded along with the before and after num shards in the fwd delta direction.
53+
* For e.g. Movie:(2,4) Actor:(8,4)
54+
*/
55+
String HEADER_TAG_TYPE_RESHARDING_INVOKED = "hollow.type.resharding.invoked";
56+
5057
/**
5158
* A header tag containing the hash of serialized hollow schema.
5259
*/

hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowReadStateEngine.java

+24
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,30 @@ public long calcApproxDataSize() {
176176
.sum();
177177
}
178178

179+
/**
180+
* @return the no. of shards for each type in the read state
181+
*/
182+
public Map<String, Integer> numShardsPerType() {
183+
Map<String, Integer> typeShards = new HashMap<>();
184+
for (String type : this.getAllTypes()) {
185+
HollowTypeReadState typeState = this.getTypeState(type);
186+
typeShards.put(type, typeState.numShards());
187+
}
188+
return typeShards;
189+
}
190+
191+
/**
192+
* @return the approx heap footprint of a single shard in bytes, for each type in the read state
193+
*/
194+
public Map<String, Long> calcApproxShardSizePerType() {
195+
Map<String, Long> typeShardSizes = new HashMap<>();
196+
for (String type : this.getAllTypes()) {
197+
HollowTypeReadState typeState = this.getTypeState(type);
198+
typeShardSizes.put(type, typeState.getApproximateShardSizeInBytes());
199+
}
200+
return typeShardSizes;
201+
}
202+
179203
@Override
180204
public HollowTypeDataAccess getTypeDataAccess(String type) {
181205
return typeStates.get(type);

hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,14 @@ public HollowTypeReadState getTypeState() {
193193
* @return an approximate accounting of the current cost of the "ordinal holes" in this type state.
194194
*/
195195
public abstract long getApproximateHoleCostInBytes();
196-
196+
197+
/**
198+
* @return an approximate accounting of the current heap footprint occupied by each shard of this type state.
199+
*/
200+
public long getApproximateShardSizeInBytes() {
201+
return getApproximateHeapFootprintInBytes() / numShards();
202+
}
203+
197204
/**
198205
* @return The number of shards into which this type is split. Sharding is transparent, so this has no effect on normal usage.
199206
*/

hollow/src/main/java/com/netflix/hollow/core/write/HollowBlobWriter.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ public void run() {
112112
partStreams.flush();
113113
}
114114

115-
116115
/**
117116
* Serialize the changes necessary to transition a consumer from the previous state
118117
* to the current state as a delta blob.
@@ -238,7 +237,7 @@ public void run() {
238237
HollowSchema schema = typeState.getSchema();
239238
schema.writeTo(partStream);
240239

241-
writeNumShards(partStream, typeState.getNumShards());
240+
writeNumShards(partStream, typeState.getRevNumShards());
242241

243242
typeState.writeReverseDelta(partStream);
244243
}

0 commit comments

Comments
 (0)