Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Resharding: Consumer-side support for list, set, map types
Browse files Browse the repository at this point in the history
Sunjeet committed Sep 23, 2024
1 parent 4d7eba8 commit e8954fb
Showing 75 changed files with 3,937 additions and 1,531 deletions.
2 changes: 1 addition & 1 deletion docs/advanced-topics.md
Original file line number Diff line number Diff line change
@@ -372,7 +372,7 @@ The number of bits used to represent a field which is one of the types (`INT`, `

32 bits are used to represent a `FLOAT`, and 64 bits are used to represent a `DOUBLE`.

`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of number of bits required to represent the maximum offset, plus one.
`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of bits required to represent the maximum offset, plus one.

Each field type may be assigned a null value. For `INT`, `LONG`, and `REFERENCE` fields, null is encoded as a value with all ones. For `FLOAT` and `DOUBLE` fields, null is encoded as special bit sequences. For `STRING` and `BYTES` fields, null is encoded by setting a designated null bit at the beginning of each field, followed by the end offset of the last populated value for that field.

Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ public class HollowDiffUIServerTest {
public void test() throws Exception {
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();

HollowDiffUIServer server = new HollowDiffUIServer();
HollowDiffUIServer server = new HollowDiffUIServer(0);

server.addDiff("diff", testDiff);

@@ -22,7 +22,7 @@ public void test() throws Exception {
public void testBackwardsCompatibiltyWithJettyImplementation() throws Exception {
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();

com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer();
com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer(0);

server.addDiff("diff", testDiff);

Original file line number Diff line number Diff line change
@@ -337,30 +337,25 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
if(!filter.includes(typeName)) {
HollowListTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema), numShards);
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.includes(typeName)) {
HollowSetTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema), numShards);
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.includes(typeName)) {
HollowMapTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema), numShards);
}
}

return typeName;
}

private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException {
stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
@@ -376,6 +371,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
int numShards = readNumShards(in);
HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
if (shouldReshard(typeState, typeState.numShards(), numShards)) {
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);
reshardingStrategy.reshard(typeState, typeState.numShards(), numShards);
}
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
@@ -384,6 +383,14 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
return schema.getName();
}

private boolean shouldReshard(HollowTypeReadState typeState, int currNumShards, int deltaNumShards) {
if (typeState instanceof HollowObjectTypeReadState) {
return currNumShards != 0 && deltaNumShards != 0 && currNumShards != deltaNumShards;
} else {
return false;
}
}

private int readNumShards(HollowBlobInput in) throws IOException {
int backwardsCompatibilityBytes = VarInt.readVInt(in);

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;

public abstract class HollowTypeDataElements {

public int maxOrdinal;

public GapEncodedVariableLengthIntegerReader encodedAdditions;
public GapEncodedVariableLengthIntegerReader encodedRemovals;

public final ArraySegmentRecycler memoryRecycler;
public final MemoryMode memoryMode;

public HollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}

public abstract void destroy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

public abstract class HollowTypeDataElementsJoiner<T extends HollowTypeDataElements> {
public final int fromMask;
public final int fromOrdinalShift;
public final T[] from;

public T to;

public HollowTypeDataElementsJoiner(T[] from) {
this.from = from;
this.fromMask = from.length - 1;
this.fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);

if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("No. of DataElements to be joined must be a power of 2");
}

for (int i=0;i<from.length;i++) {
if (from[i].maxOrdinal == -1) {
continue;
}
if (from[i].maxOrdinal > (1<<29)
|| from[i].maxOrdinal != 0 && (from.length > (1<<29)/from[i].maxOrdinal)
|| from[i].maxOrdinal * from.length + i > (1<<29)) {
throw new IllegalArgumentException("Too large to join, maxOrdinal would exceed 2<<29");
}
}

for (HollowTypeDataElements elements : from) {
if (elements.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}
}

public T join() {

initToElements();
to.maxOrdinal = -1;

populateStats();

copyRecords();

GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length];
for (int i=0;i<from.length;i++) {
fromRemovals[i] = from[i].encodedRemovals;
}
to.encodedRemovals = GapEncodedVariableLengthIntegerReader.join(fromRemovals);

return to;
}

/**
* Initialize the target data elements.
*/
public abstract void initToElements();

/**
* Populate the stats of the target data elements.
*/
public abstract void populateStats();

/**
* Copy records from the source data elements to the target data elements.
*/
public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

/**
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
public abstract class HollowTypeDataElementsSplitter<T extends HollowTypeDataElements> {
public final int numSplits;
public final int toMask;
public final int toOrdinalShift;
public final T from;

public T[] to;

public HollowTypeDataElementsSplitter(T from, int numSplits) {
this.from = from;
this.numSplits = numSplits;
this.toMask = numSplits - 1;
this.toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}

public T[] split() {

initToElements();
for(int i=0;i<to.length;i++) {
to[i].maxOrdinal = -1;
}

populateStats();

copyRecords();

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}

return to;
}

/**
* Initialize the target data elements.
*/
public abstract void initToElements();

/**
* Populate the stats of the target data elements.
*/
public abstract void populateStats();

/**
* Copy records from the source data elements to the target data elements.
*/
public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@
import java.util.stream.Stream;

/**
* A HollowTypeReadState contains and is the root handle to all of the records of a specific type in
* A HollowTypeReadState contains and is the root handle to all the records of a specific type in
* a {@link HollowReadStateEngine}.
*/
public abstract class HollowTypeReadState implements HollowTypeDataAccess {
@@ -121,16 +121,10 @@ public BitSet getPreviousOrdinals() {
*/
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

public HollowSchema getSchema() {
return schema;
}
@@ -206,4 +200,18 @@ public long getApproximateShardSizeInBytes() {
*/
public abstract int numShards();

public abstract ShardsHolder getShardsVolatile();

public abstract void updateShardsVolatile(HollowTypeReadStateShard[] shards);

public abstract HollowTypeDataElements[] createTypeDataElements(int len);

public abstract HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift);

public void destroyOriginalDataElements(HollowTypeDataElements dataElements) {
dataElements.destroy();
if (dataElements.encodedRemovals != null) {
dataElements.encodedRemovals.destroy();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.netflix.hollow.core.read.engine;

public interface HollowTypeReadStateShard {

HollowTypeDataElements getDataElements();

int getShardOrdinalShift();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.read.engine.list.HollowListTypeReadState;
import com.netflix.hollow.core.read.engine.list.HollowListTypeReshardingStrategy;
import com.netflix.hollow.core.read.engine.map.HollowMapTypeReadState;
import com.netflix.hollow.core.read.engine.map.HollowMapTypeReshardingStrategy;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReshardingStrategy;
import com.netflix.hollow.core.read.engine.set.HollowSetTypeReadState;
import com.netflix.hollow.core.read.engine.set.HollowSetTypeReshardingStrategy;
import java.util.Arrays;

public abstract class HollowTypeReshardingStrategy {
private final static HollowTypeReshardingStrategy OBJECT_RESHARDING_STRATEGY = new HollowObjectTypeReshardingStrategy();
private final static HollowTypeReshardingStrategy LIST_RESHARDING_STRATEGY = new HollowListTypeReshardingStrategy();
private final static HollowTypeReshardingStrategy SET_RESHARDING_STRATEGY = new HollowSetTypeReshardingStrategy();
private final static HollowTypeReshardingStrategy MAP_RESHARDING_STRATEGY = new HollowMapTypeReshardingStrategy();

public abstract HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataElements from, int shardingFactor);

public abstract HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from);

public static HollowTypeReshardingStrategy getInstance(HollowTypeReadState typeState) {
if (typeState instanceof HollowObjectTypeReadState) {
return OBJECT_RESHARDING_STRATEGY;
} else if (typeState instanceof HollowListTypeReadState) {
return LIST_RESHARDING_STRATEGY;
} else if (typeState instanceof HollowSetTypeReadState) {
return SET_RESHARDING_STRATEGY;
} else if (typeState instanceof HollowMapTypeReadState) {
return MAP_RESHARDING_STRATEGY;
} else {
throw new IllegalArgumentException("Unsupported type state: " + typeState.getClass().getName());
}
}

/**
* Reshards this type state to the desired shard count using O(shard size) space while supporting concurrent reads
* into the underlying data elements.
*
* @param typeState The type state to reshard
* @param prevNumShards The current number of shards in typeState
* @param newNumShards The desired number of shards for typeState
*/
public void reshard(HollowTypeReadState typeState, int prevNumShards, int newNumShards) {
int shardingFactor = shardingFactor(prevNumShards, newNumShards);
HollowTypeDataElements[] newDataElements;
int[] shardOrdinalShifts;

try {
if (newNumShards > prevNumShards) { // split existing shards
// Step 1: Grow the number of shards. Each original shard will result in N child shards where N is the sharding factor.
// The child shards will reference into the existing data elements as-is, and reuse existing shardOrdinalShift.
// However since the shards array is resized, a read will map into the new shard index, as a result a subset of
// ordinals in each shard will be accessed. In the next "splitting" step, the data elements in these new shards
// will be filtered to only retain the subset of ordinals that are actually accessed.
//
// This is an atomic update to shardsVolatile: full construction happens-before the store to shardsVolatile,
// in other words a fully constructed object as visible to this thread will be visible to other threads that
// load the new shardsVolatile.
typeState.updateShardsVolatile(expandWithOriginalDataElements(typeState.getShardsVolatile(), shardingFactor));

// Step 2: Split each original data element into N child data elements where N is the sharding factor.
// Then update each of the N child shards with the respective split of data element, this will be
// sufficient to serve all reads into this shard. Once all child shards for a pre-split parent
// shard have been assigned the split data elements, the parent data elements can be discarded.
for (int i = 0; i < prevNumShards; i++) {
HollowTypeDataElements originalDataElements = typeState.getShardsVolatile().getShards()[i].getDataElements();

typeState.updateShardsVolatile(splitDataElementsForOneShard(typeState, i, prevNumShards, shardingFactor));

typeState.destroyOriginalDataElements(originalDataElements);
}
// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a split of original data elements.

} else { // join existing shards
// Step 1: Join N data elements to create one, where N is the sharding factor. Then update each of the
// N shards to reference the joined result, but with a new shardOrdinalShift.
// Reads will continue to reference the same shard index as before, but the new shardOrdinalShift
// will help these reads land at the right ordinal in the joined shard. When all N old shards
// corresponding to one new shard have been updated, the N pre-join data elements can be destroyed.
for (int i = 0; i < newNumShards; i++) {
HollowTypeDataElements destroyCandidates[] = joinCandidates(typeState, i, shardingFactor);

typeState.updateShardsVolatile(joinDataElementsForOneShard(typeState, i, shardingFactor)); // atomic update to shardsVolatile

for (int j = 0; j < shardingFactor; j++) {
typeState.destroyOriginalDataElements(destroyCandidates[j]);
}
}

// Step 2: Resize the shards array to only keep the first newNumShards shards.
newDataElements = typeState.createTypeDataElements(typeState.getShardsVolatile().getShards().length);
shardOrdinalShifts = new int[typeState.getShardsVolatile().getShards().length];
copyShardDataElements(typeState.getShardsVolatile(), newDataElements, shardOrdinalShifts);

HollowTypeReadStateShard[] newShards = Arrays.copyOfRange(typeState.getShardsVolatile().getShards(), 0, newNumShards);
typeState.updateShardsVolatile(newShards);

// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a join of original data elements.
}
} catch (Exception e) {
throw new RuntimeException("Error in re-sharding", e);
}
}

/**
* Given old and new numShards, this method returns the shard resizing multiplier.
*/
public static int shardingFactor(int oldNumShards, int newNumShards) {
if (newNumShards <= 0 || oldNumShards <= 0 || newNumShards == oldNumShards) {
throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards);
}

boolean isNewGreater = newNumShards > oldNumShards;
int dividend = isNewGreater ? newNumShards : oldNumShards;
int divisor = isNewGreater ? oldNumShards : newNumShards;

if (dividend % divisor != 0) {
throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards);
}
return dividend / divisor;
}

private void copyShardDataElements(ShardsHolder from, HollowTypeDataElements[] newDataElements, int[] shardOrdinalShifts) {
for (int i=0; i<from.getShards().length; i++) {
newDataElements[i] = from.getShards()[i].getDataElements();
shardOrdinalShifts[i] = from.getShards()[i].getShardOrdinalShift();
}
}

private HollowTypeDataElements[] joinCandidates(HollowTypeReadState typeState, int indexIntoShards, int shardingFactor) {
HollowTypeReadStateShard[] shards = typeState.getShardsVolatile().getShards();
HollowTypeDataElements[] result = typeState.createTypeDataElements(shardingFactor);
int newNumShards = shards.length / shardingFactor;
for (int i=0; i<shardingFactor; i++) {
result[i] = shards[indexIntoShards + (newNumShards*i)].getDataElements();
}
return result;
}

public HollowTypeReadStateShard[] joinDataElementsForOneShard(HollowTypeReadState typeState, int currentIndex, int shardingFactor) {
ShardsHolder shardsHolder = typeState.getShardsVolatile();
int newNumShards = shardsHolder.getShards().length / shardingFactor;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowTypeDataElements[] joinCandidates = joinCandidates(typeState, currentIndex, shardingFactor);
HollowTypeDataElementsJoiner joiner = createDataElementsJoiner(joinCandidates);
HollowTypeDataElements joined = joiner.join();

HollowTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.getShards(), shardsHolder.getShards().length);
for (int i=0; i<shardingFactor; i++) {
newShards[currentIndex + (newNumShards*i)] = typeState.createTypeReadStateShard(typeState.getSchema(), joined, newShardOrdinalShift);
}

return newShards;
}

public HollowTypeReadStateShard[] expandWithOriginalDataElements(ShardsHolder shardsHolder, int shardingFactor) {
int prevNumShards = shardsHolder.getShards().length;
int newNumShards = prevNumShards * shardingFactor;
HollowTypeReadStateShard[] newShards = new HollowTypeReadStateShard[newNumShards];

for(int i=0; i<prevNumShards; i++) {
for (int j=0; j<shardingFactor; j++) {
newShards[i+(prevNumShards*j)] = shardsHolder.getShards()[i];
}
}
return newShards;
}

public HollowTypeReadStateShard[] splitDataElementsForOneShard(HollowTypeReadState typeState, int currentIndex, int prevNumShards, int shardingFactor) {
ShardsHolder shardsHolder = typeState.getShardsVolatile();
int newNumShards = shardsHolder.getShards().length;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowTypeDataElements dataElementsToSplit = shardsHolder.getShards()[currentIndex].getDataElements();
HollowTypeDataElementsSplitter splitter = createDataElementsSplitter(dataElementsToSplit, shardingFactor);
HollowTypeDataElements[] splits = splitter.split();

HollowTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.getShards(), shardsHolder.getShards().length);
for (int i = 0; i < shardingFactor; i ++) {
newShards[currentIndex + (prevNumShards*i)] = typeState.createTypeReadStateShard(typeState.getSchema(), splits[i], newShardOrdinalShift);
}
return newShards;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netflix.hollow.core.read.engine;

public abstract class ShardsHolder {

public abstract HollowTypeReadStateShard[] getShards();

public abstract int getShardNumberMask();

}
Original file line number Diff line number Diff line change
@@ -81,8 +81,7 @@ public IntMap getOrdinalMapping() {
}

public HollowListTypeReadState createHistoricalTypeReadState() {
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema(), 1);
historicalTypeState.setCurrentData(historicalDataElements);
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(typeState.getSchema(), historicalDataElements);
return historicalTypeState;
}

@@ -100,8 +99,11 @@ private void populateStats() {
historicalDataElements.maxOrdinal = removedEntryCount - 1;
historicalDataElements.totalNumberOfElements = totalElementCount;
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement;

for (int i=0;i<stateEngineDataElements.length;i++) {
if (stateEngineDataElements[i].bitsPerElement > historicalDataElements.bitsPerElement) {
historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement;
}
}
ordinalMapping = new IntMap(removedEntryCount);
}

@@ -110,8 +112,8 @@ private void copyRecord(int ordinal) {
int shardOrdinal = ordinal >> shardOrdinalShift;

long bitsPerElement = stateEngineDataElements[shard].bitsPerElement;
long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromEndElement = stateEngineDataElements[shard].listPointerData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromStartElement = stateEngineDataElements[shard].getStartElement(shardOrdinal);
long fromEndElement = stateEngineDataElements[shard].getEndElement(shardOrdinal);
long size = fromEndElement - fromStartElement;

historicalDataElements.elementData.copyBits(stateEngineDataElements[shard].elementData, fromStartElement * bitsPerElement, nextStartElement * bitsPerElement, size * bitsPerElement);
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.HollowTypeDataElements;
import java.io.IOException;

/**
@@ -31,30 +32,21 @@
* During a delta, the HollowListTypeReadState will create a new HollowListTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowListTypeDataElements {

int maxOrdinal;
public class HollowListTypeDataElements extends HollowTypeDataElements {

FixedLengthData listPointerData;
FixedLengthData elementData;

GapEncodedVariableLengthIntegerReader encodedAdditions;
GapEncodedVariableLengthIntegerReader encodedRemovals;

int bitsPerListPointer;
int bitsPerElement;
int bitsPerElement = 0;
long totalNumberOfElements = 0;

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowListTypeDataElements(ArraySegmentRecycler memoryRecycler) {
this(MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowListTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
super(memoryMode, memoryRecycler);
}

void readSnapshot(HollowBlobInput in) throws IOException {
@@ -109,9 +101,31 @@ public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataEl
new HollowListDeltaApplicator(fromData, deltaData, this).applyDelta();
}

@Override
public void destroy() {
FixedLengthDataFactory.destroy(listPointerData, memoryRecycler);
FixedLengthDataFactory.destroy(elementData, memoryRecycler);
}

long getStartElement(int ordinal) {
return ordinal == 0 ? 0 : listPointerData.getElementValue(((long)(ordinal-1) * bitsPerListPointer), bitsPerListPointer);
}

long getEndElement(int ordinal) {
return listPointerData.getElementValue((long)ordinal * bitsPerListPointer, bitsPerListPointer);
}

void copyElementsFrom(long startElement, HollowListTypeDataElements src, long srcStartElement, long srcEndElement) {
if (bitsPerElement == src.bitsPerElement) {
// fast path can bulk copy elements
long numElements = srcEndElement - srcStartElement;
elementData.copyBits(src.elementData, srcStartElement * bitsPerElement, startElement * bitsPerElement, numElements * bitsPerElement);
} else {
for (long element=srcStartElement;element<srcEndElement;element++) {
long elementVal = src.elementData.getElementValue(element * src.bitsPerElement, src.bitsPerElement);
elementData.setElementValue(startElement * bitsPerElement, bitsPerElement, elementVal);
startElement++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner;


/**
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
class HollowListTypeDataElementsJoiner extends HollowTypeDataElementsJoiner<HollowListTypeDataElements> {

public HollowListTypeDataElementsJoiner(HollowListTypeDataElements[] from) {
super(from);
}

@Override
public void initToElements() {
this.to = new HollowListTypeDataElements(from[0].memoryMode, from[0].memoryRecycler);
}

@Override
public void populateStats() {
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
int mappedMaxOrdinal = from[fromIndex].maxOrdinal == -1 ? -1 : (from[fromIndex].maxOrdinal * from.length) + fromIndex;
to.maxOrdinal = Math.max(to.maxOrdinal, mappedMaxOrdinal);
if (from[fromIndex].bitsPerElement > to.bitsPerElement) {
// uneven bitsPerElement could be the case for consumers that skip type shards with no additions, so pick max across all shards
to.bitsPerElement = from[fromIndex].bitsPerElement;
}
}

long totalOfListSizes = 0;
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

long startElement = from[fromIndex].getStartElement(fromOrdinal);
long endElement = from[fromIndex].getEndElement(fromOrdinal);
long numElements = endElement - startElement;
totalOfListSizes += numElements;

}
to.bitsPerListPointer = totalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalOfListSizes);
to.totalNumberOfElements = totalOfListSizes;
}

@Override
public void copyRecords() {
long elementCounter = 0;

to.listPointerData = FixedLengthDataFactory.get((long)to.bitsPerListPointer * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);
to.elementData = FixedLengthDataFactory.get(to.bitsPerElement * to.totalNumberOfElements, to.memoryMode, to.memoryRecycler);

for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shard for e.g. when consumers skip type shards with no additions
HollowListTypeDataElements source = from[fromIndex];
long startElement = source.getStartElement(fromOrdinal);
long endElement = source.getEndElement(fromOrdinal);

long numElements = endElement - startElement;
to.copyElementsFrom(elementCounter, source, startElement, endElement);
elementCounter += numElements;
}
to.listPointerData.setElementValue((long)to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter;

/**
* Split a {@code HollowListTypeDataElements} into multiple {@code HollowListTypeDataElements}s.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* {@code numSplits} must be a power of 2.
*/
public class HollowListTypeDataElementsSplitter extends HollowTypeDataElementsSplitter<HollowListTypeDataElements> {

public HollowListTypeDataElementsSplitter(HollowListTypeDataElements from, int numSplits) {
super(from, numSplits);
}

@Override
public void initToElements() {
this.to = new HollowListTypeDataElements[numSplits];
for(int i=0;i<to.length;i++) {
to[i] = new HollowListTypeDataElements(from.memoryMode, from.memoryRecycler);
}
}

@Override
public void populateStats() {
long[] totalOfListSizes = new long[numSplits];

// count elements per split
for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;
to[toIndex].maxOrdinal = toOrdinal;

long startElement = from.getStartElement(ordinal);
long endElement = from.getEndElement(ordinal);
long numElements = endElement - startElement;
totalOfListSizes[toIndex] += numElements;
}

long maxShardTotalOfListSizes = 0;
for(int toIndex=0;toIndex<numSplits;toIndex++) {
if(totalOfListSizes[toIndex] > maxShardTotalOfListSizes)
maxShardTotalOfListSizes = totalOfListSizes[toIndex];
}

for(int toIndex=0;toIndex<numSplits;toIndex++) {
HollowListTypeDataElements target = to[toIndex];
target.bitsPerElement = from.bitsPerElement; // retained
target.bitsPerListPointer = maxShardTotalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(maxShardTotalOfListSizes);
target.totalNumberOfElements = totalOfListSizes[toIndex]; // useful for heap usage stats
}
}

@Override
public void copyRecords() {
int numSplits = to.length;
long elementCounter[] = new long[numSplits];

for(int toIndex=0;toIndex<numSplits;toIndex++) {
HollowListTypeDataElements target = to[toIndex];
target.listPointerData = FixedLengthDataFactory.get((long)target.bitsPerListPointer * (target.maxOrdinal + 1), target.memoryMode, target.memoryRecycler);
target.elementData = FixedLengthDataFactory.get(target.bitsPerElement * target.totalNumberOfElements, target.memoryMode, target.memoryRecycler);
}

// count elements per split
for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;

long startElement = from.getStartElement(ordinal);
long endElement = from.getEndElement(ordinal);
HollowListTypeDataElements target = to[toIndex];

long numElements = endElement - startElement;
target.copyElementsFrom(elementCounter[toIndex], from, startElement, endElement);
elementCounter[toIndex] += numElements;

target.listPointerData.setElementValue((long)target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]);
}
}
}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import com.netflix.hollow.api.sampling.HollowListSampler;
import com.netflix.hollow.api.sampling.HollowSampler;
import com.netflix.hollow.api.sampling.HollowSamplingDirector;
import com.netflix.hollow.core.memory.HollowUnsafeHandle;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.encoding.VarInt;
@@ -28,8 +29,11 @@
import com.netflix.hollow.core.read.dataaccess.HollowListTypeDataAccess;
import com.netflix.hollow.core.read.engine.HollowCollectionTypeReadState;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.core.read.engine.HollowTypeDataElements;
import com.netflix.hollow.core.read.engine.HollowTypeReadState;
import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener;
import com.netflix.hollow.core.read.engine.ShardsHolder;
import com.netflix.hollow.core.read.engine.SnapshotPopulatedOrdinalsReader;
import com.netflix.hollow.core.read.filter.HollowFilterConfig;
import com.netflix.hollow.core.read.iterator.HollowListOrdinalIterator;
@@ -44,75 +48,81 @@
* A {@link HollowTypeReadState} for LIST type records.
*/
public class HollowListTypeReadState extends HollowCollectionTypeReadState implements HollowListTypeDataAccess {

private final HollowListSampler sampler;

private final int shardNumberMask;
private final int shardOrdinalShift;
private final HollowListTypeReadStateShard shards[];

private int maxOrdinal;

public HollowListTypeReadState(HollowReadStateEngine stateEngine, HollowListSchema schema, int numShards) {
this(stateEngine, MemoryMode.ON_HEAP, schema, numShards);
volatile HollowListTypeShardsHolder shardsVolatile;

@Override
public ShardsHolder getShardsVolatile() {
return shardsVolatile;
}

@Override
public void updateShardsVolatile(HollowTypeReadStateShard[] shards) {
this.shardsVolatile = new HollowListTypeShardsHolder(shards);
}

@Override
public HollowTypeDataElements[] createTypeDataElements(int len) {
return new HollowListTypeDataElements[len];
}

@Override
public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) {
return new HollowListTypeReadStateShard((HollowListTypeDataElements) dataElements, shardOrdinalShift);
}

public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema, int numShards) {
public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema) {
super(stateEngine, memoryMode, schema);
this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);
this.shardNumberMask = numShards - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);

if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");

HollowListTypeReadStateShard shards[] = new HollowListTypeReadStateShard[numShards];
for(int i=0;i<shards.length;i++)
shards[i] = new HollowListTypeReadStateShard();

this.shards = shards;
this.shardsVolatile = null;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
public HollowListTypeReadState(HollowListSchema schema, HollowListTypeDataElements dataElements) {
super(null, MemoryMode.ON_HEAP, schema);
this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);

HollowListTypeReadStateShard newShard = new HollowListTypeReadStateShard(dataElements, 0);
this.shardsVolatile = new HollowListTypeShardsHolder(new HollowListTypeReadStateShard[] {newShard});
this.maxOrdinal = dataElements.maxOrdinal;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
if(numShards > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i=0;i<shards.length;i++) {
HollowListTypeDataElements snapshotData = new HollowListTypeDataElements(memoryMode, memoryRecycler);
snapshotData.readSnapshot(in);
shards[i].setCurrentData(snapshotData);

HollowListTypeReadStateShard[] newShards = new HollowListTypeReadStateShard[numShards];
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
for(int i=0; i<numShards; i++) {
HollowListTypeDataElements shardDataElements = new HollowListTypeDataElements(memoryMode, memoryRecycler);
shardDataElements.readSnapshot(in);
newShards[i] = new HollowListTypeReadStateShard(shardDataElements, shardOrdinalShift);
}

if(shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;

shardsVolatile = new HollowListTypeShardsHolder(newShards);

if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;

SnapshotPopulatedOrdinalsReader.readOrdinals(in, stateListeners);
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
if(shardsVolatile.shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i=0; i<shards.length; i++) {
for(int i=0; i<shardsVolatile.shards.length; i++) {
HollowListTypeDataElements deltaData = new HollowListTypeDataElements(memoryMode, memoryRecycler);
deltaData.readDelta(in);
if(stateEngine.isSkipTypeShardUpdateWithNoAdditions() && deltaData.encodedAdditions.isEmpty()) {

if(!deltaData.encodedRemovals.isEmpty())
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shards.length);
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);

HollowListTypeDataElements currentData = shards[i].currentDataElements();
HollowListTypeDataElements currentData = shardsVolatile.shards[i].dataElements;
GapEncodedVariableLengthIntegerReader oldRemovals = currentData.encodedRemovals == null ? GapEncodedVariableLengthIntegerReader.EMPTY_READER : currentData.encodedRemovals;
if(oldRemovals.isEmpty()) {
currentData.encodedRemovals = deltaData.encodedRemovals;
@@ -128,19 +138,22 @@ public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecy
deltaData.encodedAdditions.destroy();
} else {
HollowListTypeDataElements nextData = new HollowListTypeDataElements(memoryMode, memoryRecycler);
HollowListTypeDataElements oldData = shards[i].currentDataElements();
HollowListTypeDataElements oldData = shardsVolatile.shards[i].dataElements;
nextData.applyDelta(oldData, deltaData);
shards[i].setCurrentData(nextData);
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shards.length);

HollowListTypeReadStateShard newShard = new HollowListTypeReadStateShard(nextData, shardsVolatile.shards[i].shardOrdinalShift);
shardsVolatile = new HollowListTypeShardsHolder(shardsVolatile.shards, newShard, i);

notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);
deltaData.encodedAdditions.destroy();
oldData.destroy();
}
deltaData.destroy();
stateEngine.getMemoryRecycler().swap();
}

if(shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;
if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;
}

public static void discardSnapshot(HollowBlobInput in, int numShards) throws IOException {
@@ -170,22 +183,60 @@ public int maxOrdinal() {
@Override
public int getElementOrdinal(int ordinal, int listIndex) {
sampler.recordGet();
return shards[ordinal & shardNumberMask].getElementOrdinal(ordinal >> shardOrdinalShift, listIndex);

HollowListTypeShardsHolder shardsHolder;
HollowListTypeReadStateShard shard;
int shardOrdinal;
int elementOrdinal;
long startElement;
long endElement;

do {
do {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
shardOrdinal = ordinal >> shard.shardOrdinalShift;

startElement = shard.dataElements.getStartElement(shardOrdinal);
endElement = shard.dataElements.getEndElement(shardOrdinal);
} while(readWasUnsafe(shardsHolder, ordinal, shard));

elementOrdinal = shard.getElementOrdinal(startElement, endElement, listIndex);
} while(readWasUnsafe(shardsHolder, ordinal, shard));

return elementOrdinal;
}

@Override
public int size(int ordinal) {
sampler.recordSize();
return shards[ordinal & shardNumberMask].size(ordinal >> shardOrdinalShift);

HollowListTypeShardsHolder shardsHolder;
HollowListTypeReadStateShard shard;
int size;

do {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];

size = shard.size(ordinal >> shard.shardOrdinalShift);
} while(readWasUnsafe(shardsHolder, ordinal, shard));
return size;
}

@Override
public HollowOrdinalIterator ordinalIterator(int ordinal) {
sampler.recordIterator();

return new HollowListOrdinalIterator(ordinal, this);
}

private boolean readWasUnsafe(HollowListTypeShardsHolder shardsHolder, int ordinal, HollowListTypeReadStateShard shard) {
HollowUnsafeHandle.getUnsafe().loadFence();
HollowListTypeShardsHolder currShardsHolder = shardsVolatile;
return shardsHolder != currShardsHolder
&& (shard != currShardsHolder.shards[ordinal & currShardsHolder.shardNumberMask]);
}

@Override
public HollowSampler getSampler() {
return sampler;
@@ -209,39 +260,39 @@ public void ignoreUpdateThreadForSampling(Thread t) {
@Override
protected void invalidate() {
stateListeners = EMPTY_LISTENERS;
for(int i=0;i<shards.length;i++)
shards[i].invalidate();
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
int numShards = shards.length;
HollowListTypeReadStateShard[] newShards = new HollowListTypeReadStateShard[numShards];
for (int i=0;i<numShards;i++) {
newShards[i] = new HollowListTypeReadStateShard(null, shards[i].shardOrdinalShift);
}
this.shardsVolatile = new HollowListTypeShardsHolder(newShards);
}

HollowListTypeDataElements[] currentDataElements() {
HollowListTypeDataElements currentDataElements[] = new HollowListTypeDataElements[shards.length];

for(int i=0; i<shards.length; i++)
currentDataElements[i] = shards[i].currentDataElements();

return currentDataElements;
}

void setCurrentData(HollowListTypeDataElements data) {
if(shards.length > 1)
throw new UnsupportedOperationException("Cannot directly set data on sharded type state");
shards[0].setCurrentData(data);
maxOrdinal = data.maxOrdinal;
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
HollowListTypeDataElements[] elements = new HollowListTypeDataElements[shards.length];
for (int i=0;i<shards.length;i++) {
elements[i] = shards[i].dataElements;
}
return elements;
}

@Override
protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema) {
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
if(!getSchema().equals(withSchema))
throw new IllegalArgumentException("HollowListTypeReadState cannot calculate checksum with unequal schemas: " + getSchema().getName());

BitSet populatedOrdinals = getListener(PopulatedOrdinalListener.class).getPopulatedOrdinals();

for(int i=0; i<shards.length; i++)
shards[i].applyToChecksum(checksum, populatedOrdinals, i, shards.length);
shards[i].applyShardToChecksum(checksum, populatedOrdinals, i, shards.length);
}

@Override
public long getApproximateHeapFootprintInBytes() {
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
long totalApproximateHeapFootprintInBytes = 0;

for(int i=0; i<shards.length; i++)
@@ -252,6 +303,7 @@ public long getApproximateHeapFootprintInBytes() {

@Override
public long getApproximateHoleCostInBytes() {
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
long totalApproximateHoleCostInBytes = 0;

BitSet populatedOrdinals = getPopulatedOrdinals();
@@ -264,7 +316,6 @@ public long getApproximateHoleCostInBytes() {

@Override
public int numShards() {
return shards.length;
return this.shardsVolatile.shards.length;
}

}
Original file line number Diff line number Diff line change
@@ -18,99 +18,50 @@

import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;

import com.netflix.hollow.core.memory.HollowUnsafeHandle;
import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.tools.checksum.HollowChecksum;
import java.util.BitSet;

class HollowListTypeReadStateShard {
class HollowListTypeReadStateShard implements HollowTypeReadStateShard {

private volatile HollowListTypeDataElements currentDataVolatile;
final HollowListTypeDataElements dataElements;
final int shardOrdinalShift;

public int getElementOrdinal(int ordinal, int listIndex) {
HollowListTypeDataElements currentData;
int elementOrdinal;

do {
long startElement;
long endElement;

do {
currentData = this.currentDataVolatile;

if (ordinal == 0) {
startElement = 0;
endElement = currentData.listPointerData.getElementValue(0, currentData.bitsPerListPointer);
} else {
long endFixedLengthOffset = (long)ordinal * currentData.bitsPerListPointer;
long startFixedLengthOffset = endFixedLengthOffset - currentData.bitsPerListPointer;
startElement = currentData.listPointerData.getElementValue(startFixedLengthOffset, currentData.bitsPerListPointer);
endElement = currentData.listPointerData.getElementValue(endFixedLengthOffset, currentData.bitsPerListPointer);
}
} while(readWasUnsafe(currentData));

long elementIndex = startElement + listIndex;

if(elementIndex >= endElement)
throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement));

elementOrdinal = (int)currentData.elementData.getElementValue(elementIndex * currentData.bitsPerElement, currentData.bitsPerElement);
} while(readWasUnsafe(currentData));

return elementOrdinal;
@Override
public HollowListTypeDataElements getDataElements() {
return dataElements;
}

public int size(int ordinal) {
HollowListTypeDataElements currentData;
int size;

do {
currentData = this.currentDataVolatile;

long startElement;
long endElement;
if (ordinal == 0) {
startElement = 0;
endElement = currentData.listPointerData.getElementValue(0, currentData.bitsPerListPointer);
} else {
long endFixedLengthOffset = (long)ordinal * currentData.bitsPerListPointer;
long startFixedLengthOffset = endFixedLengthOffset - currentData.bitsPerListPointer;
startElement = currentData.listPointerData.getElementValue(startFixedLengthOffset, currentData.bitsPerListPointer);
endElement = currentData.listPointerData.getElementValue(endFixedLengthOffset, currentData.bitsPerListPointer);
}

size = (int)(endElement - startElement);
} while(readWasUnsafe(currentData));

return size;
@Override
public int getShardOrdinalShift() {
return shardOrdinalShift;
}

void invalidate() {
setCurrentData(null);
public HollowListTypeReadStateShard(HollowListTypeDataElements dataElements, int shardOrdinalShift) {
this.shardOrdinalShift = shardOrdinalShift;
this.dataElements = dataElements;
}

HollowListTypeDataElements currentDataElements() {
return currentDataVolatile;
}

private boolean readWasUnsafe(HollowListTypeDataElements data) {
HollowUnsafeHandle.getUnsafe().loadFence();
return data != currentDataVolatile;
}
public int size(int ordinal) {
long startElement = dataElements.getStartElement(ordinal);
long endElement = dataElements.getEndElement(ordinal);
int size = (int)(endElement - startElement);

void setCurrentData(HollowListTypeDataElements data) {
this.currentDataVolatile = data;
return size;
}

protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) {
protected void applyShardToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) {
int ordinal = populatedOrdinals.nextSetBit(shardNumber);
while(ordinal != ORDINAL_NONE) {
if((ordinal & (numShards - 1)) == shardNumber) {
int shardOrdinal = ordinal / numShards;
int size = size(shardOrdinal);

checksum.applyInt(ordinal);
long startElement = dataElements.getStartElement(shardOrdinal);
long endElement = dataElements.getEndElement(shardOrdinal);
for(int i=0;i<size;i++)
checksum.applyInt(getElementOrdinal(shardOrdinal, i));
checksum.applyInt(getElementOrdinal(startElement, endElement, i));

ordinal = ordinal + numShards;
} else {
@@ -122,22 +73,29 @@ protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals
}
}

int getElementOrdinal(long startElement, long endElement, int listIndex) {
long elementIndex = startElement + listIndex;
if(elementIndex >= endElement)
throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement));

int elementOrdinal = (int)dataElements.elementData.getElementValue(elementIndex * dataElements.bitsPerElement, dataElements.bitsPerElement);
return elementOrdinal;
}

public long getApproximateHeapFootprintInBytes() {
HollowListTypeDataElements currentData = currentDataVolatile;
long requiredListPointerBits = ((long)currentData.maxOrdinal + 1) * currentData.bitsPerListPointer;
long requiredElementBits = currentData.totalNumberOfElements * currentData.bitsPerElement;
long requiredListPointerBits = ((long)dataElements.maxOrdinal + 1) * dataElements.bitsPerListPointer;
long requiredElementBits = dataElements.totalNumberOfElements * dataElements.bitsPerElement;
long requiredBits = requiredListPointerBits + requiredElementBits;
return requiredBits / 8;
}

public long getApproximateHoleCostInBytes(BitSet populatedOrdinals, int shardNumber, int numShards) {
HollowListTypeDataElements currentData = currentDataVolatile;
long holeBits = 0;

int holeOrdinal = populatedOrdinals.nextClearBit(0);
while(holeOrdinal <= currentData.maxOrdinal) {
while(holeOrdinal <= dataElements.maxOrdinal) {
if((holeOrdinal & (numShards - 1)) == shardNumber)
holeBits += currentData.bitsPerListPointer;
holeBits += dataElements.bitsPerListPointer;

holeOrdinal = populatedOrdinals.nextClearBit(holeOrdinal + 1);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.read.engine.HollowTypeDataElements;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter;
import com.netflix.hollow.core.read.engine.HollowTypeReshardingStrategy;

public class HollowListTypeReshardingStrategy extends HollowTypeReshardingStrategy {
@Override
public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataElements from, int shardingFactor) {
return new HollowListTypeDataElementsSplitter((HollowListTypeDataElements) from, shardingFactor);
}

@Override
public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) {
return new HollowListTypeDataElementsJoiner((HollowListTypeDataElements[]) from);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.core.read.engine.ShardsHolder;

public class HollowListTypeShardsHolder extends ShardsHolder {
final HollowListTypeReadStateShard shards[];
final int shardNumberMask;

/**
* Thread safe construction of ShardHolder with given shards
* @param fromShards shards to be used
*/
public HollowListTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {
this.shards = new HollowListTypeReadStateShard[fromShards.length];
for (int i=0; i<fromShards.length; i++) {
this.shards[i] = (HollowListTypeReadStateShard) fromShards[i];
}
this.shardNumberMask = fromShards.length - 1;
}

/**
* Thread safe construction of a ShardHolder which has all the shards from {@code oldShards} except
* the shard at index {@code newShardIndex}, using the shard {@code newShard} at that index instead.
* @param oldShards original shards
* @param newShard a new shard
* @param newShardIndex index at which to place the new shard
*/
HollowListTypeShardsHolder(HollowListTypeReadStateShard[] oldShards, HollowListTypeReadStateShard newShard, int newShardIndex) {
int numShards = oldShards.length;
HollowListTypeReadStateShard[] shards = new HollowListTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
if (i == newShardIndex) {
shards[i] = newShard;
} else {
shards[i] = oldShards[i];
}
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

@Override
public HollowTypeReadStateShard[] getShards() {
return shards;
}

@Override
public int getShardNumberMask() {
return shardNumberMask;
}
}
Original file line number Diff line number Diff line change
@@ -82,8 +82,7 @@ public IntMap getOrdinalMapping() {
}

public HollowMapTypeReadState createHistoricalTypeReadState() {
HollowMapTypeReadState historicalTypeState = new HollowMapTypeReadState(null, typeState.getSchema(), 1);
historicalTypeState.setCurrentData(historicalDataElements);
HollowMapTypeReadState historicalTypeState = new HollowMapTypeReadState(typeState.getSchema(), historicalDataElements);
return historicalTypeState;
}

@@ -107,10 +106,18 @@ private void populateStats() {
historicalDataElements.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(totalBucketCount);
historicalDataElements.bitsPerMapSizeValue = 64 - Long.numberOfLeadingZeros(maxSize);
historicalDataElements.bitsPerFixedLengthMapPortion = historicalDataElements.bitsPerMapPointer + historicalDataElements.bitsPerMapSizeValue;
historicalDataElements.bitsPerKeyElement = stateEngineDataElements[0].bitsPerKeyElement;
historicalDataElements.bitsPerValueElement = stateEngineDataElements[0].bitsPerValueElement;
historicalDataElements.bitsPerMapEntry = stateEngineDataElements[0].bitsPerMapEntry;
historicalDataElements.emptyBucketKeyValue = stateEngineDataElements[0].emptyBucketKeyValue;
for (int i=0;i<stateEngineDataElements.length;i++) {
if (stateEngineDataElements[i].bitsPerKeyElement > historicalDataElements.bitsPerKeyElement) {
historicalDataElements.bitsPerKeyElement = stateEngineDataElements[i].bitsPerKeyElement;
historicalDataElements.emptyBucketKeyValue = stateEngineDataElements[i].emptyBucketKeyValue;
}
if (stateEngineDataElements[i].bitsPerValueElement > historicalDataElements.bitsPerValueElement) {
historicalDataElements.bitsPerValueElement = stateEngineDataElements[i].bitsPerValueElement;
}
if (stateEngineDataElements[i].bitsPerMapEntry > historicalDataElements.bitsPerMapEntry) {
historicalDataElements.bitsPerMapEntry = stateEngineDataElements[i].bitsPerMapEntry;
}
}
historicalDataElements.totalNumberOfBuckets = totalBucketCount;

ordinalMapping = new IntMap(removedEntryCount);
@@ -123,8 +130,8 @@ private void copyRecord(int ordinal) {
long bitsPerBucket = historicalDataElements.bitsPerMapEntry;
long size = typeState.size(ordinal);

long fromStartBucket = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].mapPointerAndSizeData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerFixedLengthMapPortion, stateEngineDataElements[shard].bitsPerMapPointer);
long fromEndBucket = stateEngineDataElements[shard].mapPointerAndSizeData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerFixedLengthMapPortion, stateEngineDataElements[shard].bitsPerMapPointer);
long fromStartBucket = stateEngineDataElements[shard].getStartBucket(shardOrdinal);
long fromEndBucket = stateEngineDataElements[shard].getEndBucket(shardOrdinal);
long numBuckets = fromEndBucket - fromStartBucket;

historicalDataElements.mapPointerAndSizeData.setElementValue((long)nextOrdinal * historicalDataElements.bitsPerFixedLengthMapPortion, historicalDataElements.bitsPerMapPointer, nextStartBucket + numBuckets);
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.HollowTypeDataElements;
import java.io.IOException;

/**
@@ -31,35 +32,26 @@
* During a delta, the HollowMapTypeReadState will create a new HollowMapTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowMapTypeDataElements {

int maxOrdinal;
public class HollowMapTypeDataElements extends HollowTypeDataElements {

FixedLengthData mapPointerAndSizeData;
FixedLengthData entryData;

GapEncodedVariableLengthIntegerReader encodedRemovals;
GapEncodedVariableLengthIntegerReader encodedAdditions;

int bitsPerMapPointer;
int bitsPerMapSizeValue;
int bitsPerFixedLengthMapPortion;
int bitsPerKeyElement;
int bitsPerValueElement;
int bitsPerMapEntry;
int bitsPerKeyElement = 0;
int bitsPerValueElement = 0;
int bitsPerMapEntry = 0;
int emptyBucketKeyValue;
long totalNumberOfBuckets;

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowMapTypeDataElements(ArraySegmentRecycler memoryRecycler) {
this(MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowMapTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
super(memoryMode, memoryRecycler);
}

void readSnapshot(HollowBlobInput in) throws IOException {
@@ -121,9 +113,44 @@ public void applyDelta(HollowMapTypeDataElements fromData, HollowMapTypeDataElem
new HollowMapDeltaApplicator(fromData, deltaData, this).applyDelta();
}

@Override
public void destroy() {
FixedLengthDataFactory.destroy(mapPointerAndSizeData, memoryRecycler);
FixedLengthDataFactory.destroy(entryData, memoryRecycler);
}

long getStartBucket(int ordinal) {
return ordinal == 0 ? 0 : mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * bitsPerFixedLengthMapPortion, bitsPerMapPointer);
}

long getEndBucket(int ordinal) {
return mapPointerAndSizeData.getElementValue((long)ordinal * bitsPerFixedLengthMapPortion, bitsPerMapPointer);
}

int getBucketKeyByAbsoluteIndex(long absoluteBucketIndex) {
return (int)entryData.getElementValue(absoluteBucketIndex * bitsPerMapEntry, bitsPerKeyElement);
}

int getBucketValueByAbsoluteIndex(long absoluteBucketIndex) {
return (int)entryData.getElementValue((absoluteBucketIndex * bitsPerMapEntry) + bitsPerKeyElement, bitsPerValueElement);
}

void copyBucketsFrom(long startBucket, HollowMapTypeDataElements src, long srcStartBucket, long srcEndBucket) {
if (bitsPerKeyElement == src.bitsPerKeyElement && bitsPerValueElement == src.bitsPerValueElement) {
// fast path can bulk copy buckets. emptyBucketKeyValue is same since bitsPerKeyElement is the same
long numBuckets = srcEndBucket - srcStartBucket;
entryData.copyBits(src.entryData, srcStartBucket * bitsPerMapEntry, startBucket * bitsPerMapEntry, numBuckets * bitsPerMapEntry);
} else {
for (long bucket=srcStartBucket;bucket<srcEndBucket;bucket++) {
long bucketKey = src.entryData.getElementValue(bucket * src.bitsPerMapEntry, src.bitsPerKeyElement);
long bucketValue = src.entryData.getElementValue(bucket * src.bitsPerMapEntry + src.bitsPerKeyElement, src.bitsPerValueElement);
if(bucketKey == src.emptyBucketKeyValue)
bucketKey = emptyBucketKeyValue;
long targetBucketOffset = startBucket * bitsPerMapEntry;
entryData.setElementValue(targetBucketOffset, bitsPerKeyElement, bucketKey);
entryData.setElementValue(targetBucketOffset + bitsPerKeyElement, bitsPerValueElement, bucketValue);
startBucket++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.netflix.hollow.core.read.engine.map;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner;


/**
* Join multiple {@code HollowMapTypeDataElements}s into 1 {@code HollowMapTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
class HollowMapTypeDataElementsJoiner extends HollowTypeDataElementsJoiner<HollowMapTypeDataElements> {

public HollowMapTypeDataElementsJoiner(HollowMapTypeDataElements[] from) {
super(from);
}

@Override
public void initToElements() {
this.to = new HollowMapTypeDataElements(from[0].memoryMode, from[0].memoryRecycler);
}

@Override
public void populateStats() {
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
int mappedMaxOrdinal = from[fromIndex].maxOrdinal == -1 ? -1 : (from[fromIndex].maxOrdinal * from.length) + fromIndex;
to.maxOrdinal = Math.max(to.maxOrdinal, mappedMaxOrdinal);

// uneven stats could be the case for consumers that skip type shards with no additions, so pick max across all shards
HollowMapTypeDataElements source = from[fromIndex];
if (source.bitsPerKeyElement > to.bitsPerKeyElement) {
to.bitsPerKeyElement = source.bitsPerKeyElement;
}
if (source.bitsPerValueElement > to.bitsPerValueElement) {
to.bitsPerValueElement = source.bitsPerValueElement;
}
if (source.bitsPerMapSizeValue > to.bitsPerMapSizeValue) {
to.bitsPerMapSizeValue = source.bitsPerMapSizeValue;
}
}
to.emptyBucketKeyValue = (1 << to.bitsPerKeyElement) - 1;

long totalOfMapBuckets = 0;
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

HollowMapTypeDataElements source = from[fromIndex];

long startBucket = source.getStartBucket(fromOrdinal);
long endBucket = source.getEndBucket(fromOrdinal);
long numBuckets = endBucket - startBucket;

totalOfMapBuckets += numBuckets;
}

to.totalNumberOfBuckets = totalOfMapBuckets;
to.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(to.totalNumberOfBuckets);
to.bitsPerFixedLengthMapPortion = to.bitsPerMapSizeValue + to.bitsPerMapPointer;
to.bitsPerMapEntry = to.bitsPerKeyElement + to.bitsPerValueElement;
}

@Override
public void copyRecords() {
long bucketCounter = 0;

to.mapPointerAndSizeData = FixedLengthDataFactory.get((long)(to.maxOrdinal + 1) * to.bitsPerFixedLengthMapPortion, to.memoryMode, to.memoryRecycler);
to.entryData = FixedLengthDataFactory.get(to.totalNumberOfBuckets * to.bitsPerMapEntry, to.memoryMode, to.memoryRecycler);

for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

HollowMapTypeDataElements source = from[fromIndex];

long mapSize = 0;
if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shards resulting from skipping type shards with no additions, mapSize remains 0
long startBucket = source.getStartBucket(fromOrdinal);
long endBucket = source.getEndBucket(fromOrdinal);

long numBuckets = endBucket - startBucket;
to.copyBucketsFrom(bucketCounter, source, startBucket, endBucket);
bucketCounter += numBuckets;

mapSize = source.mapPointerAndSizeData.getElementValue((long)(fromOrdinal * source.bitsPerFixedLengthMapPortion) + source.bitsPerMapPointer, source.bitsPerMapSizeValue);
}
to.mapPointerAndSizeData.setElementValue( (long)ordinal * to.bitsPerFixedLengthMapPortion, to.bitsPerMapPointer, bucketCounter);
to.mapPointerAndSizeData.setElementValue((long)(ordinal * to.bitsPerFixedLengthMapPortion) + to.bitsPerMapPointer, to.bitsPerMapSizeValue, mapSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.netflix.hollow.core.read.engine.map;


import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter;

/**
* Split a {@code HollowMapTypeDataElements} into multiple {@code HollowMapTypeDataElements}s.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* {@code numSplits} must be a power of 2.
*/
public class HollowMapTypeDataElementsSplitter extends HollowTypeDataElementsSplitter<HollowMapTypeDataElements> {

public HollowMapTypeDataElementsSplitter(HollowMapTypeDataElements from, int numSplits) {
super(from, numSplits);
}

@Override
public void initToElements() {
this.to = new HollowMapTypeDataElements[numSplits];
for(int i=0;i<to.length;i++) {
to[i] = new HollowMapTypeDataElements(from.memoryMode, from.memoryRecycler);
}
}

@Override
public void populateStats() {
long[] shardTotalOfMapBuckets = new long[numSplits];
long maxShardTotalOfMapBuckets = 0;

for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;
to[toIndex].maxOrdinal = toOrdinal;

long startBucket = from.getStartBucket(ordinal);
long endBucket = from.getEndBucket(ordinal);
long numBuckets = endBucket - startBucket;

shardTotalOfMapBuckets[toIndex] += numBuckets;
if(shardTotalOfMapBuckets[toIndex] > maxShardTotalOfMapBuckets) {
maxShardTotalOfMapBuckets = shardTotalOfMapBuckets[toIndex];
}
}

for(int toIndex=0;toIndex<numSplits;toIndex++) {
HollowMapTypeDataElements target = to[toIndex];
// retained
target.bitsPerKeyElement = from.bitsPerKeyElement;
target.bitsPerValueElement = from.bitsPerValueElement;
target.bitsPerMapSizeValue = from.bitsPerMapSizeValue;
target.emptyBucketKeyValue = from.emptyBucketKeyValue;

// recomputed
target.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfMapBuckets);
target.totalNumberOfBuckets = shardTotalOfMapBuckets[toIndex];
target.bitsPerFixedLengthMapPortion = target.bitsPerMapSizeValue + target.bitsPerMapPointer;
target.bitsPerMapEntry = target.bitsPerKeyElement + target.bitsPerValueElement;
}
}

@Override
public void copyRecords() {
int numSplits = to.length;
long bucketCounter[] = new long[numSplits];

for(int toIndex=0;toIndex<numSplits;toIndex++) {
HollowMapTypeDataElements target = to[toIndex];
target.mapPointerAndSizeData = FixedLengthDataFactory.get((long)(target.maxOrdinal + 1) * target.bitsPerFixedLengthMapPortion, target.memoryMode, target.memoryRecycler);
target.entryData = FixedLengthDataFactory.get(target.totalNumberOfBuckets * target.bitsPerMapEntry, target.memoryMode, target.memoryRecycler);
}

for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
int toIndex = ordinal & toMask;
int toOrdinal = ordinal >> toOrdinalShift;

HollowMapTypeDataElements target = to[toIndex];
long startBucket = from.getStartBucket(ordinal);
long endBucket = from.getEndBucket(ordinal);

long numBuckets = endBucket - startBucket;
target.copyBucketsFrom(bucketCounter[toIndex], from, startBucket, endBucket);
bucketCounter[toIndex] += numBuckets;

target.mapPointerAndSizeData.setElementValue((long)toOrdinal * target.bitsPerFixedLengthMapPortion, target.bitsPerMapPointer, bucketCounter[toIndex]);
long mapSize = from.mapPointerAndSizeData.getElementValue((long)(ordinal * from.bitsPerFixedLengthMapPortion) + from.bitsPerMapPointer, from.bitsPerMapSizeValue);
target.mapPointerAndSizeData.setElementValue((long)(toOrdinal * target.bitsPerFixedLengthMapPortion) + target.bitsPerMapPointer, target.bitsPerMapSizeValue, mapSize);
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -18,212 +18,77 @@

import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;

import com.netflix.hollow.core.index.key.HollowPrimaryKeyValueDeriver;
import com.netflix.hollow.core.memory.HollowUnsafeHandle;
import com.netflix.hollow.core.memory.encoding.HashCodes;
import com.netflix.hollow.core.read.engine.SetMapKeyHasher;
import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.tools.checksum.HollowChecksum;
import java.util.BitSet;

class HollowMapTypeReadStateShard {

private volatile HollowMapTypeDataElements currentDataVolatile;

private HollowPrimaryKeyValueDeriver keyDeriver;

public int size(int ordinal) {
HollowMapTypeDataElements currentData;
int size;
class HollowMapTypeReadStateShard implements HollowTypeReadStateShard {

do {
currentData = this.currentDataVolatile;
size = (int)currentData.mapPointerAndSizeData.getElementValue(((long)ordinal * currentData.bitsPerFixedLengthMapPortion) + currentData.bitsPerMapPointer, currentData.bitsPerMapSizeValue);
} while(readWasUnsafe(currentData));
final HollowMapTypeDataElements dataElements;
final int shardOrdinalShift;

return size;
@Override
public HollowMapTypeDataElements getDataElements() {
return dataElements;
}

public int get(int ordinal, int keyOrdinal, int hashCode) {
HollowMapTypeDataElements currentData;
int valueOrdinal;

threadsafe:
do {
long startBucket;
long endBucket;
do {
currentData = this.currentDataVolatile;

startBucket = ordinal == 0 ? 0 : currentData.mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
endBucket = currentData.mapPointerAndSizeData.getElementValue((long)ordinal * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
} while(readWasUnsafe(currentData));

hashCode = HashCodes.hashInt(hashCode);
long bucket = startBucket + (hashCode & (endBucket - startBucket - 1));
int bucketKeyOrdinal = getBucketKeyByAbsoluteIndex(currentData, bucket);

while(bucketKeyOrdinal != currentData.emptyBucketKeyValue) {
if(bucketKeyOrdinal == keyOrdinal) {
valueOrdinal = getBucketValueByAbsoluteIndex(currentData, bucket);
continue threadsafe;
}
bucket++;
if(bucket == endBucket)
bucket = startBucket;
bucketKeyOrdinal = getBucketKeyByAbsoluteIndex(currentData, bucket);
}

valueOrdinal = ORDINAL_NONE;
} while(readWasUnsafe(currentData));

return valueOrdinal;
@Override
public int getShardOrdinalShift() {
return shardOrdinalShift;
}

public int findKey(int ordinal, Object... hashKey) {
int hashCode = SetMapKeyHasher.hash(hashKey, keyDeriver.getFieldTypes());

HollowMapTypeDataElements currentData;

threadsafe:
do {
long startBucket;
long endBucket;
do {
currentData = this.currentDataVolatile;

startBucket = ordinal == 0 ? 0 : currentData.mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
endBucket = currentData.mapPointerAndSizeData.getElementValue((long)ordinal * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
} while(readWasUnsafe(currentData));

long bucket = startBucket + (hashCode & (endBucket - startBucket - 1));
int bucketKeyOrdinal = getBucketKeyByAbsoluteIndex(currentData, bucket);

while(bucketKeyOrdinal != currentData.emptyBucketKeyValue) {
if(readWasUnsafe(currentData))
continue threadsafe;

if(keyDeriver.keyMatches(bucketKeyOrdinal, hashKey)) {
return bucketKeyOrdinal;
}

bucket++;
if(bucket == endBucket)
bucket = startBucket;
bucketKeyOrdinal = getBucketKeyByAbsoluteIndex(currentData, bucket);
}

} while(readWasUnsafe(currentData));

return ORDINAL_NONE;
public HollowMapTypeReadStateShard(HollowMapTypeDataElements dataElements, int shardOrdinalShift) {
this.shardOrdinalShift = shardOrdinalShift;
this.dataElements = dataElements;
}

public long findEntry(int ordinal, Object... hashKey) {
int hashCode = SetMapKeyHasher.hash(hashKey, keyDeriver.getFieldTypes());

HollowMapTypeDataElements currentData;

threadsafe:
do {
long startBucket;
long endBucket;
do {
currentData = this.currentDataVolatile;

startBucket = ordinal == 0 ? 0 : currentData.mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
endBucket = currentData.mapPointerAndSizeData.getElementValue((long)ordinal * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
} while(readWasUnsafe(currentData));

long bucket = startBucket + (hashCode & (endBucket - startBucket - 1));
int bucketKeyOrdinal = getBucketKeyByAbsoluteIndex(currentData, bucket);

while(bucketKeyOrdinal != currentData.emptyBucketKeyValue) {
if(readWasUnsafe(currentData))
continue threadsafe;

if(keyDeriver.keyMatches(bucketKeyOrdinal, hashKey)) {
long valueOrdinal = getBucketValueByAbsoluteIndex(currentData, bucket);
if(readWasUnsafe(currentData))
continue threadsafe;
public int size(int ordinal) {
int size = (int)dataElements.mapPointerAndSizeData.getElementValue(((long)ordinal * dataElements.bitsPerFixedLengthMapPortion) + dataElements.bitsPerMapPointer, dataElements.bitsPerMapSizeValue);
return size;
}

return (long)bucketKeyOrdinal << 32 | valueOrdinal;
}
int get(int hashCode, long startBucket, long endBucket, int keyOrdinal) {
hashCode = HashCodes.hashInt(hashCode);
long bucket = startBucket + (hashCode & (endBucket - startBucket - 1));
int bucketKeyOrdinal = dataElements.getBucketKeyByAbsoluteIndex(bucket);

bucket++;
if(bucket == endBucket)
bucket = startBucket;
bucketKeyOrdinal = getBucketKeyByAbsoluteIndex(currentData, bucket);
while(bucketKeyOrdinal != dataElements.emptyBucketKeyValue) {
if(bucketKeyOrdinal == keyOrdinal) {
return dataElements.getBucketValueByAbsoluteIndex(bucket);
}

} while(readWasUnsafe(currentData));

return -1L;
bucket++;
if(bucket == endBucket)
bucket = startBucket;
bucketKeyOrdinal = dataElements.getBucketKeyByAbsoluteIndex(bucket);
}
return ORDINAL_NONE;
}

public long relativeBucket(int ordinal, int bucketIndex) {
HollowMapTypeDataElements currentData;
public long relativeBucket(long absoluteBucketIndex) {
long bucketValue;
do {
long absoluteBucketIndex;
do {
currentData = this.currentDataVolatile;
absoluteBucketIndex = getAbsoluteBucketStart(currentData, ordinal) + bucketIndex;
} while(readWasUnsafe(currentData));
long key = getBucketKeyByAbsoluteIndex(currentData, absoluteBucketIndex);
if(key == currentData.emptyBucketKeyValue)
return -1L;

bucketValue = key << 32 | getBucketValueByAbsoluteIndex(currentData, absoluteBucketIndex);
} while(readWasUnsafe(currentData));

long key = dataElements.getBucketKeyByAbsoluteIndex(absoluteBucketIndex);
if(key == dataElements.emptyBucketKeyValue)
return -1L;
bucketValue = key << 32 | dataElements.getBucketValueByAbsoluteIndex(absoluteBucketIndex);
return bucketValue;
}

private long getAbsoluteBucketStart(HollowMapTypeDataElements currentData, int ordinal) {
long startBucket = ordinal == 0 ? 0 : currentData.mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * currentData.bitsPerFixedLengthMapPortion, currentData.bitsPerMapPointer);
return startBucket;
}

private int getBucketKeyByAbsoluteIndex(HollowMapTypeDataElements currentData, long absoluteBucketIndex) {
return (int)currentData.entryData.getElementValue(absoluteBucketIndex * currentData.bitsPerMapEntry, currentData.bitsPerKeyElement);
}

private int getBucketValueByAbsoluteIndex(HollowMapTypeDataElements currentData, long absoluteBucketIndex) {
return (int)currentData.entryData.getElementValue((absoluteBucketIndex * currentData.bitsPerMapEntry) + currentData.bitsPerKeyElement, currentData.bitsPerValueElement);
}

void invalidate() {
setCurrentData(null);
}

HollowMapTypeDataElements currentDataElements() {
return currentDataVolatile;
}

private boolean readWasUnsafe(HollowMapTypeDataElements data) {
HollowUnsafeHandle.getUnsafe().loadFence();
return data != currentDataVolatile;
}

void setCurrentData(HollowMapTypeDataElements data) {
this.currentDataVolatile = data;
}

protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) {
HollowMapTypeDataElements currentData = currentDataVolatile;
protected void applyShardToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) {
int ordinal = populatedOrdinals.nextSetBit(shardNumber);
while(ordinal != ORDINAL_NONE) {
if((ordinal & (numShards - 1)) == shardNumber) {
int shardOrdinal = ordinal / numShards;
int numBuckets = HashCodes.hashTableSize(size(shardOrdinal));
long offset = getAbsoluteBucketStart(currentData, shardOrdinal);
long offset = dataElements.getStartBucket(shardOrdinal);

checksum.applyInt(ordinal);
for(int i=0; i<numBuckets; i++) {
int bucketKey = getBucketKeyByAbsoluteIndex(currentData, offset + i);
if(bucketKey != currentData.emptyBucketKeyValue) {
int bucketKey = dataElements.getBucketKeyByAbsoluteIndex(offset + i);
if(bucketKey != dataElements.emptyBucketKeyValue) {
checksum.applyInt(i);
checksum.applyInt(bucketKey);
checksum.applyInt(getBucketValueByAbsoluteIndex(currentData, offset + i));
checksum.applyInt(dataElements.getBucketValueByAbsoluteIndex(offset + i));
}
}
ordinal = ordinal + numShards;
@@ -237,32 +102,23 @@ protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals
}

public long getApproximateHeapFootprintInBytes() {
HollowMapTypeDataElements currentData = currentDataVolatile;
long requiredBitsForMapPointers = ((long)currentData.maxOrdinal + 1) * currentData.bitsPerFixedLengthMapPortion;
long requiredBitsForMapBuckets = (long)currentData.totalNumberOfBuckets * currentData.bitsPerMapEntry;
long requiredBitsForMapPointers = ((long)dataElements.maxOrdinal + 1) * dataElements.bitsPerFixedLengthMapPortion;
long requiredBitsForMapBuckets = (long)dataElements.totalNumberOfBuckets * dataElements.bitsPerMapEntry;
long requiredBits = requiredBitsForMapPointers + requiredBitsForMapBuckets;
return requiredBits / 8;
}

public long getApproximateHoleCostInBytes(BitSet populatedOrdinals, int shardNumber, int numShards) {
HollowMapTypeDataElements currentData = currentDataVolatile;
long holeBits = 0;

int holeOrdinal = populatedOrdinals.nextClearBit(0);
while(holeOrdinal <= currentData.maxOrdinal) {
while(holeOrdinal <= dataElements.maxOrdinal) {
if((holeOrdinal & (numShards - 1)) == shardNumber)
holeBits += currentData.bitsPerFixedLengthMapPortion;
holeBits += dataElements.bitsPerFixedLengthMapPortion;

holeOrdinal = populatedOrdinals.nextClearBit(holeOrdinal + 1);
}

return holeBits / 8;
}


public void setKeyDeriver(HollowPrimaryKeyValueDeriver keyDeriver) {
this.keyDeriver = keyDeriver;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.hollow.core.read.engine.map;

import com.netflix.hollow.core.read.engine.HollowTypeDataElements;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter;
import com.netflix.hollow.core.read.engine.HollowTypeReshardingStrategy;

public class HollowMapTypeReshardingStrategy extends HollowTypeReshardingStrategy {
@Override
public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataElements from, int shardingFactor) {
return new HollowMapTypeDataElementsSplitter((HollowMapTypeDataElements) from, shardingFactor);
}

@Override
public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) {
return new HollowMapTypeDataElementsJoiner((HollowMapTypeDataElements[]) from);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.netflix.hollow.core.read.engine.map;

import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.core.read.engine.ShardsHolder;

public class HollowMapTypeShardsHolder extends ShardsHolder {
final HollowMapTypeReadStateShard shards[];
final int shardNumberMask;

/**
* Thread safe construction of ShardHolder with given shards
* @param fromShards shards to be used
*/
public HollowMapTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {
this.shards = new HollowMapTypeReadStateShard[fromShards.length];
for (int i=0; i<fromShards.length; i++) {
this.shards[i] = (HollowMapTypeReadStateShard) fromShards[i];
}
this.shardNumberMask = fromShards.length - 1;
}

/**
* Thread safe construction of a ShardHolder which has all the shards from {@code oldShards} except
* the shard at index {@code newShardIndex}, using the shard {@code newShard} at that index instead.
* @param oldShards original shards
* @param newShard a new shard
* @param newShardIndex index at which to place the new shard
*/
HollowMapTypeShardsHolder(HollowMapTypeReadStateShard[] oldShards, HollowMapTypeReadStateShard newShard, int newShardIndex) {
int numShards = oldShards.length;
HollowMapTypeReadStateShard[] shards = new HollowMapTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
if (i == newShardIndex) {
shards[i] = newShard;
} else {
shards[i] = oldShards[i];
}
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

@Override
public HollowTypeReadStateShard[] getShards() {
return shards;
}

@Override
public int getShardNumberMask() {
return shardNumberMask;
}
}
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ public class HollowObjectDeltaHistoricalStateCreator {
private final HollowObjectTypeDataElements historicalDataElements;

private HollowObjectTypeReadState typeState;
private HollowObjectTypeReadState.ShardsHolder shardsHolder;
private HollowObjectTypeShardsHolder shardsHolder;
private RemovedOrdinalIterator iter;
private IntMap ordinalMapping;
private int nextOrdinal;
@@ -96,7 +96,6 @@ public IntMap getOrdinalMapping() {

public HollowObjectTypeReadState createHistoricalTypeReadState() {
HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(typeState.getSchema(), historicalDataElements);

return historicalTypeState;
}

Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.HollowTypeDataElements;
import com.netflix.hollow.core.schema.HollowObjectSchema;
import java.io.IOException;

@@ -35,18 +36,13 @@
* During a delta, the HollowObjectTypeReadState will create a new HollowObjectTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowObjectTypeDataElements {
public class HollowObjectTypeDataElements extends HollowTypeDataElements {

final HollowObjectSchema schema;

int maxOrdinal;

FixedLengthData fixedLengthData;
final VariableLengthData varLengthData[];

GapEncodedVariableLengthIntegerReader encodedAdditions;
GapEncodedVariableLengthIntegerReader encodedRemovals;

final int bitsPerField[];
final int bitOffsetPerField[];
final long nullValueForField[];
@@ -55,21 +51,17 @@ public class HollowObjectTypeDataElements {
private int bitsPerUnfilteredField[];
private boolean unfilteredFieldIsIncluded[];

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowObjectTypeDataElements(HollowObjectSchema schema, ArraySegmentRecycler memoryRecycler) {
this(schema, MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowObjectTypeDataElements(HollowObjectSchema schema, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
super(memoryMode, memoryRecycler);
varLengthData = new VariableLengthData[schema.numFields()];
bitsPerField = new int[schema.numFields()];
bitOffsetPerField = new int[schema.numFields()];
nullValueForField = new long[schema.numFields()];
this.schema = schema;
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}

void readSnapshot(HollowBlobInput in, HollowObjectSchema unfilteredSchema) throws IOException {
@@ -199,10 +191,11 @@ static void discardFromInput(HollowBlobInput in, HollowObjectSchema schema, int
}
}

void applyDelta(HollowObjectTypeDataElements fromData, HollowObjectTypeDataElements deltaData) {
public void applyDelta(HollowObjectTypeDataElements fromData, HollowObjectTypeDataElements deltaData) {
new HollowObjectDeltaApplicator(fromData, deltaData, this).applyDelta();
}

@Override
public void destroy() {
FixedLengthDataFactory.destroy(fixedLengthData, memoryRecycler);
for(int i=0;i<varLengthData.length;i++) {
@@ -240,18 +233,31 @@ static long varLengthSize(HollowObjectTypeDataElements from, int ordinal, int fi

static void copyRecord(HollowObjectTypeDataElements to, int toOrdinal, HollowObjectTypeDataElements from, int fromOrdinal, long[] currentWriteVarLengthDataPointers) {
for(int fieldIndex=0;fieldIndex<to.schema.numFields();fieldIndex++) {
if(to.varLengthData[fieldIndex] == null) {
long value = from.fixedLengthData.getLargeElementValue(((long)fromOrdinal * from.bitsPerRecord) + from.bitOffsetPerField[fieldIndex], from.bitsPerField[fieldIndex]);
to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], value);
long currentReadFixedLengthStartBit = ((long)fromOrdinal * from.bitsPerRecord) + from.bitOffsetPerField[fieldIndex];
long readValue = from.bitsPerField[fieldIndex] > 56 ?
from.fixedLengthData.getLargeElementValue(currentReadFixedLengthStartBit, from.bitsPerField[fieldIndex])
: from.fixedLengthData.getElementValue(currentReadFixedLengthStartBit, from.bitsPerField[fieldIndex]);

long toWriteFixedLengthStartBit = ((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex];
if(to.varLengthData[fieldIndex] == null) { // fixed len data
if(readValue == from.nullValueForField[fieldIndex]) {
writeNullFixedLengthField(to, fieldIndex, toWriteFixedLengthStartBit);
}
else {
to.fixedLengthData.setElementValue(toWriteFixedLengthStartBit, to.bitsPerField[fieldIndex], readValue);
}
} else {
long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex);
long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex);
long size = fromEndByte - fromStartByte;

to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size);
to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size);

currentWriteVarLengthDataPointers[fieldIndex] += size;
if ((readValue & (1L << (from.bitsPerField[fieldIndex] - 1))) != 0) { // null check is the first bit set (other bits have an offset of the last non-null value)
writeNullVarLengthField(to, fieldIndex, toWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex);
long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex);
long size = fromEndByte - fromStartByte;

to.fixedLengthData.setElementValue(toWriteFixedLengthStartBit, to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size);
to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size);
currentWriteVarLengthDataPointers[fieldIndex] += size;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -6,7 +6,8 @@

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.memory.VariableLengthDataFactory;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner;
import com.netflix.hollow.core.schema.HollowObjectSchema;


/**
@@ -15,62 +16,24 @@
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
class HollowObjectTypeDataElementsJoiner {
public class HollowObjectTypeDataElementsJoiner extends HollowTypeDataElementsJoiner<HollowObjectTypeDataElements> {

HollowObjectTypeDataElements join(HollowObjectTypeDataElements[] from) {
final int fromMask = from.length - 1;
final int fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);
long[] currentWriteVarLengthDataPointers;
private HollowObjectSchema schema;

if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("No. of DataElements to be joined must be a power of 2");
}

HollowObjectTypeDataElements to = new HollowObjectTypeDataElements(from[0].schema, from[0].memoryMode, from[0].memoryRecycler);
currentWriteVarLengthDataPointers = new long[from[0].schema.numFields()];

populateStats(to, from);

GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length];
for (int i=0;i<from.length;i++) {
fromRemovals[i] = from[i].encodedRemovals;
}
to.encodedRemovals = GapEncodedVariableLengthIntegerReader.join(fromRemovals);

for (HollowObjectTypeDataElements elements : from) {
if (elements.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}

for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

if (fromOrdinal <= from[fromIndex].maxOrdinal) {
copyRecord(to, ordinal, from[fromIndex], fromOrdinal, currentWriteVarLengthDataPointers);
} else {
// lopsided shards could result for consumers that skip type shards with no additions
writeNullRecord(to, ordinal, currentWriteVarLengthDataPointers);
}
}

return to;
public HollowObjectTypeDataElementsJoiner(HollowObjectTypeDataElements[] from) {
super(from);
this.schema = from[0].schema;
}

private void writeNullRecord(HollowObjectTypeDataElements to, int toOrdinal, long[] currentWriteVarLengthDataPointers) {
for(int fieldIndex=0;fieldIndex<to.schema.numFields();fieldIndex++) {
long currentWriteFixedLengthStartBit = ((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex];
writeNullField(to, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
}
@Override
public void initToElements() {
this.to = new HollowObjectTypeDataElements(schema, from[0].memoryMode, from[0].memoryRecycler);
}

void populateStats(HollowObjectTypeDataElements to, HollowObjectTypeDataElements[] from) {
@Override
public void populateStats() {
long[] varLengthSizes = new long[to.schema.numFields()];

to.maxOrdinal = -1;
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
for(int ordinal=0;ordinal<=from[fromIndex].maxOrdinal;ordinal++) {
for(int fieldIdx=0;fieldIdx<to.schema.numFields();fieldIdx++) {
@@ -105,10 +68,36 @@ void populateStats(HollowObjectTypeDataElements to, HollowObjectTypeDataElements
to.bitOffsetPerField[fieldIdx] = to.bitsPerRecord;
to.bitsPerRecord += to.bitsPerField[fieldIdx];
}
to.fixedLengthData = FixedLengthDataFactory.get((long)to.bitsPerRecord * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);

// unused
// to.bitsPerUnfilteredField
// to.unfilteredFieldIsIncluded
}

@Override
public void copyRecords() {
long[] currentWriteVarLengthDataPointers = new long[from[0].schema.numFields()];

to.fixedLengthData = FixedLengthDataFactory.get((long)to.bitsPerRecord * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);

for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

if (fromOrdinal <= from[fromIndex].maxOrdinal) {
copyRecord(to, ordinal, from[fromIndex], fromOrdinal, currentWriteVarLengthDataPointers);
} else {
// handle lopsided shards that could result from e.g. consumers skipping type shards with no additions
writeNullRecord(to, ordinal, currentWriteVarLengthDataPointers);
}
}

}

private void writeNullRecord(HollowObjectTypeDataElements to, int toOrdinal, long[] currentWriteVarLengthDataPointers) {
for(int fieldIndex=0;fieldIndex<to.schema.numFields();fieldIndex++) {
long currentWriteFixedLengthStartBit = ((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex];
writeNullField(to, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
}
}
}
Original file line number Diff line number Diff line change
@@ -5,64 +5,33 @@

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.memory.VariableLengthDataFactory;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter;
import com.netflix.hollow.core.schema.HollowObjectSchema;

/**
* Split a {@code HollowObjectTypeDataElements} into multiple {@code HollowObjectTypeDataElements}s.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* {@code numSplits} must be a power of 2.
*/
public class HollowObjectTypeDataElementsSplitter {
public class HollowObjectTypeDataElementsSplitter extends HollowTypeDataElementsSplitter<HollowObjectTypeDataElements> {
private HollowObjectSchema schema;

HollowObjectTypeDataElements[] split(HollowObjectTypeDataElements from, int numSplits) {
final int toMask = numSplits - 1;
final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
final long[][] currentWriteVarLengthDataPointers;

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

HollowObjectTypeDataElements[] to = new HollowObjectTypeDataElements[numSplits];
for(int i=0;i<to.length;i++) {
to[i] = new HollowObjectTypeDataElements(from.schema, from.memoryMode, from.memoryRecycler);
to[i].maxOrdinal = -1;
}
currentWriteVarLengthDataPointers = new long[numSplits][from.schema.numFields()];

populateStats(to, from, toMask, toOrdinalShift);

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}
if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
public HollowObjectTypeDataElementsSplitter(HollowObjectTypeDataElements from, int numSplits) {
super(from, numSplits);
this.schema = from.schema;
}

@Override
public void initToElements() {
this.to = new HollowObjectTypeDataElements[numSplits];
for(int i=0;i<to.length;i++) {
to[i].fixedLengthData = FixedLengthDataFactory.get((long)to[i].bitsPerRecord * (to[i].maxOrdinal + 1), to[i].memoryMode, to[i].memoryRecycler);
for(int fieldIdx=0;fieldIdx<from.schema.numFields();fieldIdx++) {
if(from.varLengthData[fieldIdx] != null) {
to[i].varLengthData[fieldIdx] = VariableLengthDataFactory.get(from.memoryMode, from.memoryRecycler);
}
}
}

for(int i=0;i<=from.maxOrdinal;i++) {
int toIndex = i & toMask;
int toOrdinal = i >> toOrdinalShift;
copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]);
to[i] = new HollowObjectTypeDataElements(schema, from.memoryMode, from.memoryRecycler);
}
return to;
}

private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDataElements from, int toMask, int toOrdinalShift) {
@Override
public void populateStats() {
long[][] varLengthSizes = new long[to.length][from.schema.numFields()];

for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
@@ -93,4 +62,24 @@ private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDa
}
}
}

@Override
public void copyRecords() {
final long[][] currentWriteVarLengthDataPointers = new long[to.length][from.schema.numFields()];

for(int i=0;i<to.length;i++) {
to[i].fixedLengthData = FixedLengthDataFactory.get((long)to[i].bitsPerRecord * (to[i].maxOrdinal + 1), to[i].memoryMode, to[i].memoryRecycler);
for(int fieldIdx=0;fieldIdx<from.schema.numFields();fieldIdx++) {
if(from.varLengthData[fieldIdx] != null) {
to[i].varLengthData[fieldIdx] = VariableLengthDataFactory.get(from.memoryMode, from.memoryRecycler);
}
}
}

for(int i=0;i<=from.maxOrdinal;i++) {
int toIndex = i & toMask;
int toOrdinal = i >> toOrdinalShift;
copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]);
}
}
}
Loading

0 comments on commit e8954fb

Please sign in to comment.