Skip to content

Commit f72303e

Browse files
authored
HollowObjectTypeDataElements utilities for splitting and joining (#642)
* HollowObjectTypeDataElements utilities for splitting and joining * Assume non-uniform field width across shards, add tests for filtered fields * Restore private modifier, cleanup, more test * Work in tandem with isSkipTypeShardUpdateWithNoAdditions --------- Co-authored-by: Sunjeet Singh <[email protected]>
1 parent 59c4083 commit f72303e

11 files changed

+905
-61
lines changed

hollow/src/main/java/com/netflix/hollow/core/memory/FixedLengthDataFactory.java

+8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr
2222
}
2323
}
2424

25+
public static FixedLengthData get(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
26+
if (memoryMode.equals(MemoryMode.ON_HEAP)) {
27+
return new FixedLengthElementArray(memoryRecycler, numBits);
28+
} else {
29+
throw new UnsupportedOperationException("Memory mode " + memoryMode.name() + " not supported");
30+
}
31+
}
32+
2533
public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecycler) {
2634
if (fld instanceof FixedLengthElementArray) {
2735
((FixedLengthElementArray) fld).destroy(memoryRecycler);

hollow/src/main/java/com/netflix/hollow/core/memory/encoding/GapEncodedVariableLengthIntegerReader.java

+105
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
import com.netflix.hollow.core.memory.ByteDataArray;
2020
import com.netflix.hollow.core.memory.SegmentedByteArray;
2121
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
22+
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
2223
import com.netflix.hollow.core.read.HollowBlobInput;
2324
import com.netflix.hollow.core.util.IOUtils;
2425
import java.io.DataOutputStream;
2526
import java.io.IOException;
2627
import java.io.OutputStream;
28+
import java.util.ArrayList;
29+
import java.util.HashSet;
30+
import java.util.List;
2731

2832
public class GapEncodedVariableLengthIntegerReader {
2933

@@ -140,4 +144,105 @@ public static GapEncodedVariableLengthIntegerReader combine(GapEncodedVariableLe
140144

141145
return new GapEncodedVariableLengthIntegerReader(arr.getUnderlyingArray(), (int)arr.length());
142146
}
147+
148+
/**
149+
* Splits this {@code GapEncodedVariableLengthIntegerReader} into {@code numSplits} new instances.
150+
* The original data is not cleaned up.
151+
*
152+
* @param numSplits the number of instances to split into, should be a power of 2.
153+
* @return an array of {@code GapEncodedVariableLengthIntegerReader} instances populated with the results of the split.
154+
*/
155+
public GapEncodedVariableLengthIntegerReader[] split(int numSplits) {
156+
if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
157+
throw new IllegalStateException("Split should only be called with powers of 2, it was called with " + numSplits);
158+
}
159+
final int toMask = numSplits - 1;
160+
final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
161+
GapEncodedVariableLengthIntegerReader[] to = new GapEncodedVariableLengthIntegerReader[numSplits];
162+
163+
List<Integer> ordinals = new ArrayList<>();
164+
reset();
165+
while(nextElement() != Integer.MAX_VALUE) {
166+
ordinals.add(nextElement());
167+
advance();
168+
}
169+
170+
ByteDataArray[] splitOrdinals = new ByteDataArray[numSplits];
171+
int previousSplitOrdinal[] = new int[numSplits];
172+
for (int ordinal : ordinals) {
173+
int toIndex = ordinal & toMask;
174+
int toOrdinal = ordinal >> toOrdinalShift;
175+
if (splitOrdinals[toIndex] == null) {
176+
splitOrdinals[toIndex] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
177+
}
178+
VarInt.writeVInt(splitOrdinals[toIndex], toOrdinal - previousSplitOrdinal[toIndex]);
179+
previousSplitOrdinal[toIndex] = toOrdinal;
180+
}
181+
for(int i=0;i<numSplits;i++) {
182+
if (splitOrdinals[i] == null) {
183+
to[i] = EMPTY_READER;
184+
} else {
185+
to[i] = new GapEncodedVariableLengthIntegerReader(splitOrdinals[i].getUnderlyingArray(), (int) splitOrdinals[i].length());
186+
}
187+
}
188+
189+
return to;
190+
}
191+
192+
/**
193+
* Join an array of {@code GapEncodedVariableLengthIntegerReader} instances into one.
194+
* The original data is not cleaned up.
195+
*
196+
* @param from the array of {@code GapEncodedVariableLengthIntegerReader} to join, should have a power of 2 number of elements.
197+
* @return an instance of {@code GapEncodedVariableLengthIntegerReader} with the joined result.
198+
*/
199+
public static GapEncodedVariableLengthIntegerReader join(GapEncodedVariableLengthIntegerReader[] from) {
200+
if (from==null) {
201+
throw new IllegalStateException("Join invoked on a null input array");
202+
}
203+
if (from.length<=0 || !((from.length&(from.length-1))==0)) {
204+
throw new IllegalStateException("Join should only be called with powers of 2, it was called with " + from.length);
205+
}
206+
207+
int numSplits = from.length;
208+
final int fromMask = numSplits - 1;
209+
final int fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
210+
int joinedMaxOrdinal = -1;
211+
212+
HashSet<Integer>[] fromOrdinals = new HashSet[from.length];
213+
for (int i=0;i<from.length;i++) {
214+
fromOrdinals[i] = new HashSet<>();
215+
if (from[i] == null) {
216+
continue;
217+
}
218+
from[i].reset();
219+
220+
while(from[i].nextElement() != Integer.MAX_VALUE) {
221+
int splitOrdinal = from[i].nextElement();
222+
fromOrdinals[i].add(splitOrdinal);
223+
joinedMaxOrdinal = Math.max(joinedMaxOrdinal, splitOrdinal*numSplits + i);
224+
from[i].advance();
225+
}
226+
}
227+
228+
ByteDataArray toRemovals = null;
229+
int previousOrdinal = 0;
230+
for (int ordinal=0;ordinal<=joinedMaxOrdinal;ordinal++) {
231+
int fromIndex = ordinal & fromMask;
232+
int fromOrdinal = ordinal >> fromOrdinalShift;
233+
if (fromOrdinals[fromIndex].contains(fromOrdinal)) {
234+
if (toRemovals == null) {
235+
toRemovals = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
236+
}
237+
VarInt.writeVInt(toRemovals, ordinal - previousOrdinal);
238+
previousOrdinal = ordinal;
239+
}
240+
}
241+
242+
if (toRemovals == null) {
243+
return EMPTY_READER;
244+
} else {
245+
return new GapEncodedVariableLengthIntegerReader(toRemovals.getUnderlyingArray(), (int) toRemovals.length());
246+
}
247+
}
143248
}

hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectDeltaHistoricalStateCreator.java

+11-58
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.netflix.hollow.core.read.engine.object;
1818

1919
import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE;
20+
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.copyRecord;
21+
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.varLengthSize;
2022

2123
import com.netflix.hollow.core.memory.SegmentedByteArray;
2224
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
@@ -71,7 +73,11 @@ public void populateHistory() {
7173
int ordinal = iter.next();
7274
while(ordinal != ORDINAL_NONE) {
7375
ordinalMapping.put(ordinal, nextOrdinal);
74-
copyRecord(ordinal);
76+
77+
int shard = ordinal & shardNumberMask;
78+
int shardOrdinal = ordinal >> shardOrdinalShift;
79+
copyRecord(historicalDataElements, nextOrdinal, stateEngineDataElements[shard], shardOrdinal, currentWriteVarLengthDataPointers);
80+
nextOrdinal++;
7581

7682
ordinal = iter.next();
7783
}
@@ -109,7 +115,9 @@ private void populateStats() {
109115

110116
for(int i=0;i<totalVarLengthSizes.length;i++) {
111117
if(stateEngineDataElements[0].varLengthData[i] != null) {
112-
totalVarLengthSizes[i] += varLengthSize(ordinal, i);
118+
int shard = ordinal & shardNumberMask;
119+
int shardOrdinal = ordinal >> shardOrdinalShift;
120+
totalVarLengthSizes[i] += varLengthSize(stateEngineDataElements[shard], shardOrdinal, i);
113121
}
114122
}
115123

@@ -125,66 +133,11 @@ private void populateStats() {
125133
historicalDataElements.bitsPerField[i] = (64 - Long.numberOfLeadingZeros(totalVarLengthSizes[i] + 1)) + 1;
126134
}
127135

128-
historicalDataElements.nullValueForField[i] = (1L << historicalDataElements.bitsPerField[i]) - 1;
136+
historicalDataElements.nullValueForField[i] = historicalDataElements.bitsPerField[i] == 64 ? -1L : (1L << historicalDataElements.bitsPerField[i]) - 1;
129137
historicalDataElements.bitOffsetPerField[i] = historicalDataElements.bitsPerRecord;
130138
historicalDataElements.bitsPerRecord += historicalDataElements.bitsPerField[i];
131139
}
132140

133141
ordinalMapping = new IntMap(removedEntryCount);
134142
}
135-
136-
private long varLengthSize(int ordinal, int fieldIdx) {
137-
int shard = ordinal & shardNumberMask;
138-
int shardOrdinal = ordinal >> shardOrdinalShift;
139-
140-
int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
141-
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * shardOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
142-
long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
143-
long startByte = shardOrdinal != 0 ? stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0;
144-
145-
return endByte - startByte;
146-
}
147-
148-
private void copyRecord(int ordinal) {
149-
int shard = ordinal & shardNumberMask;
150-
int shardOrdinal = ordinal >> shardOrdinalShift;
151-
152-
for(int i=0;i<historicalDataElements.schema.numFields();i++) {
153-
if(historicalDataElements.varLengthData[i] == null) {
154-
long value = stateEngineDataElements[shard].fixedLengthData.getLargeElementValue(((long)shardOrdinal * stateEngineDataElements[shard].bitsPerRecord) + stateEngineDataElements[shard].bitOffsetPerField[i], stateEngineDataElements[shard].bitsPerField[i]);
155-
historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], value);
156-
} else {
157-
long fromStartByte = varLengthStartByte(shard, shardOrdinal, i);
158-
long fromEndByte = varLengthEndByte(shard, shardOrdinal, i);
159-
long size = fromEndByte - fromStartByte;
160-
161-
historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], currentWriteVarLengthDataPointers[i] + size);
162-
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);
163-
164-
currentWriteVarLengthDataPointers[i] += size;
165-
}
166-
}
167-
168-
nextOrdinal++;
169-
}
170-
171-
private long varLengthStartByte(int shard, int translatedOrdinal, int fieldIdx) {
172-
if(translatedOrdinal == 0)
173-
return 0;
174-
175-
int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
176-
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * translatedOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
177-
long startByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset - stateEngineDataElements[shard].bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
178-
179-
return startByte;
180-
}
181-
182-
private long varLengthEndByte(int shard, int translatedOrdinal, int fieldIdx) {
183-
int numBitsForField = stateEngineDataElements[shard].bitsPerField[fieldIdx];
184-
long currentBitOffset = ((long)stateEngineDataElements[shard].bitsPerRecord * translatedOrdinal) + stateEngineDataElements[shard].bitOffsetPerField[fieldIdx];
185-
long endByte = stateEngineDataElements[shard].fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
186-
187-
return endByte;
188-
}
189-
190143
}

hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElements.java

+61
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,65 @@ public void destroy() {
211211
}
212212
}
213213

214+
static long varLengthStartByte(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
215+
if(ordinal == 0)
216+
return 0;
217+
218+
int numBitsForField = from.bitsPerField[fieldIdx];
219+
long currentBitOffset = ((long)from.bitsPerRecord * ordinal) + from.bitOffsetPerField[fieldIdx];
220+
long startByte = from.fixedLengthData.getElementValue(currentBitOffset - from.bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
221+
222+
return startByte;
223+
}
224+
225+
static long varLengthEndByte(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
226+
int numBitsForField = from.bitsPerField[fieldIdx];
227+
long currentBitOffset = ((long)from.bitsPerRecord * ordinal) + from.bitOffsetPerField[fieldIdx];
228+
long endByte = from.fixedLengthData.getElementValue(currentBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
229+
230+
return endByte;
231+
}
232+
233+
static long varLengthSize(HollowObjectTypeDataElements from, int ordinal, int fieldIdx) {
234+
int numBitsForField = from.bitsPerField[fieldIdx];
235+
long fromBitOffset = ((long)from.bitsPerRecord*ordinal) + from.bitOffsetPerField[fieldIdx];
236+
long fromEndByte = from.fixedLengthData.getElementValue(fromBitOffset, numBitsForField) & (1L << (numBitsForField - 1)) - 1;
237+
long fromStartByte = ordinal != 0 ? from.fixedLengthData.getElementValue(fromBitOffset - from.bitsPerRecord, numBitsForField) & (1L << (numBitsForField - 1)) - 1 : 0;
238+
return fromEndByte - fromStartByte;
239+
}
240+
241+
static void copyRecord(HollowObjectTypeDataElements to, int toOrdinal, HollowObjectTypeDataElements from, int fromOrdinal, long[] currentWriteVarLengthDataPointers) {
242+
for(int fieldIndex=0;fieldIndex<to.schema.numFields();fieldIndex++) {
243+
if(to.varLengthData[fieldIndex] == null) {
244+
long value = from.fixedLengthData.getLargeElementValue(((long)fromOrdinal * from.bitsPerRecord) + from.bitOffsetPerField[fieldIndex], from.bitsPerField[fieldIndex]);
245+
to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], value);
246+
} else {
247+
long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex);
248+
long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex);
249+
long size = fromEndByte - fromStartByte;
250+
251+
to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size);
252+
to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size);
253+
254+
currentWriteVarLengthDataPointers[fieldIndex] += size;
255+
}
256+
}
257+
}
258+
259+
static void writeNullField(HollowObjectTypeDataElements target, int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
260+
if(target.varLengthData[fieldIndex] != null) {
261+
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
262+
} else {
263+
writeNullFixedLengthField(target, fieldIndex, currentWriteFixedLengthStartBit);
264+
}
265+
}
266+
267+
static void writeNullVarLengthField(HollowObjectTypeDataElements target, int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
268+
long writeValue = (1L << (target.bitsPerField[fieldIndex] - 1)) | currentWriteVarLengthDataPointers[fieldIndex];
269+
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], writeValue);
270+
}
271+
272+
static void writeNullFixedLengthField(HollowObjectTypeDataElements target, int fieldIndex, long currentWriteFixedLengthStartBit) {
273+
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], target.nullValueForField[fieldIndex]);
274+
}
214275
}

0 commit comments

Comments
 (0)