Skip to content

Commit d89e530

Browse files
committed
WIP: delta and rev delta ser requires numShards
1 parent 6e0a4a2 commit d89e530

File tree

6 files changed

+52
-112
lines changed

6 files changed

+52
-112
lines changed

hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java

+6-24
Original file line numberDiff line numberDiff line change
@@ -232,26 +232,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE
232232
}
233233

234234
@Override
235-
public void calculateDelta() {
236-
calculateDelta(previousCyclePopulated, currentCyclePopulated);
237-
}
238-
239-
@Override
240-
public void writeDelta(DataOutputStream dos) throws IOException {
241-
writeCalculatedDelta(dos);
242-
}
243-
244-
@Override
245-
public void calculateReverseDelta() {
246-
calculateDelta(currentCyclePopulated, previousCyclePopulated);
247-
}
248-
249-
@Override
250-
public void writeReverseDelta(DataOutputStream dos) throws IOException {
251-
writeCalculatedDelta(dos);
252-
}
253-
254-
private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
235+
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, int numShards) {
255236
maxOrdinal = ordinalMap.maxOrdinal();
256237
numListsInDelta = new int[numShards];
257238
numElementsInDelta = new long[numShards];
@@ -318,16 +299,17 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
318299
}
319300
}
320301

321-
private void writeCalculatedDelta(DataOutputStream os) throws IOException {
302+
@Override
303+
public void writeCalculatedDelta(DataOutputStream os, int numShards, int[] maxShardOrdinal) throws IOException {
322304
/// for unsharded blobs, support pre v2.1.0 clients
323305
if(numShards == 1) {
324-
writeCalculatedDeltaShard(os, 0);
306+
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
325307
} else {
326308
/// overall max ordinal
327309
VarInt.writeVInt(os, maxOrdinal);
328310

329311
for(int i=0;i<numShards;i++) {
330-
writeCalculatedDeltaShard(os, i);
312+
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
331313
}
332314
}
333315

@@ -338,7 +320,7 @@ private void writeCalculatedDelta(DataOutputStream os) throws IOException {
338320
}
339321

340322

341-
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber) throws IOException {
323+
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal) throws IOException {
342324
/// 1) max shard ordinal
343325
VarInt.writeVInt(os, maxShardOrdinal[shardNumber]);
344326

hollow/src/main/java/com/netflix/hollow/core/write/HollowMapTypeWriteState.java

+6-24
Original file line numberDiff line numberDiff line change
@@ -332,26 +332,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE
332332
}
333333

334334
@Override
335-
public void calculateDelta() {
336-
calculateDelta(previousCyclePopulated, currentCyclePopulated);
337-
}
338-
339-
@Override
340-
public void writeDelta(DataOutputStream dos) throws IOException {
341-
writeCalculatedDelta(dos);
342-
}
343-
344-
@Override
345-
public void calculateReverseDelta() {
346-
calculateDelta(currentCyclePopulated, previousCyclePopulated);
347-
}
348-
349-
@Override
350-
public void writeReverseDelta(DataOutputStream dos) throws IOException {
351-
writeCalculatedDelta(dos);
352-
}
353-
354-
private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
335+
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, int numShards) {
355336
maxOrdinal = ordinalMap.maxOrdinal();
356337
int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer;
357338
int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement;
@@ -465,16 +446,17 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
465446
}
466447
}
467448

468-
private void writeCalculatedDelta(DataOutputStream os) throws IOException {
449+
@Override
450+
public void writeCalculatedDelta(DataOutputStream os, int numShards, int[] maxShardOrdinal) throws IOException {
469451
/// for unsharded blobs, support pre v2.1.0 clients
470452
if(numShards == 1) {
471-
writeCalculatedDeltaShard(os, 0);
453+
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
472454
} else {
473455
/// overall max ordinal
474456
VarInt.writeVInt(os, maxOrdinal);
475457

476458
for(int i=0;i<numShards;i++) {
477-
writeCalculatedDeltaShard(os, i);
459+
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
478460
}
479461
}
480462

@@ -484,7 +466,7 @@ private void writeCalculatedDelta(DataOutputStream os) throws IOException {
484466
deltaRemovedOrdinals = null;
485467
}
486468

487-
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber) throws IOException {
469+
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal) throws IOException {
488470

489471
int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer;
490472
int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement;

hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java

+3-23
Original file line numberDiff line numberDiff line change
@@ -243,28 +243,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE
243243
}
244244

245245
@Override
246-
public void calculateDelta() {
247-
calculateDelta(previousCyclePopulated, currentCyclePopulated, numShards);
248-
}
249-
250-
@Override
251-
public void writeDelta(DataOutputStream dos) throws IOException {
252-
LOG.log(Level.FINE, String.format("Writing delta with num shards = %s, max shard ordinals = %s", numShards, Arrays.toString(maxShardOrdinal)));
253-
writeCalculatedDelta(dos, numShards, maxShardOrdinal);
254-
}
255-
256-
@Override
257-
public void calculateReverseDelta() {
258-
calculateDelta(currentCyclePopulated, previousCyclePopulated, revNumShards); // SNAP: TODO: extend passing of revNumShards for other types
259-
}
260-
261-
@Override
262-
public void writeReverseDelta(DataOutputStream dos) throws IOException {
263-
LOG.log(Level.FINE, String.format("Writing reversedelta with num shards = %s, max shard ordinals = %s", revNumShards, Arrays.toString(revMaxShardOrdinal)));
264-
writeCalculatedDelta(dos, revNumShards, revMaxShardOrdinal); // SNAP: TODO: extend passing of revNumShards and revMaxShardOrdinal for other types
265-
}
266-
267-
private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, int numShards) {
246+
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, int numShards) {
268247
maxOrdinal = ordinalMap.maxOrdinal();
269248
int numBitsPerRecord = fieldStats.getNumBitsPerRecord();
270249

@@ -311,7 +290,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
311290
}
312291
}
313292

314-
private void writeCalculatedDelta(DataOutputStream os, int numShards, int[] maxShardOrdinal) throws IOException {
293+
@Override
294+
public void writeCalculatedDelta(DataOutputStream os, int numShards, int[] maxShardOrdinal) throws IOException {
315295
/// for unsharded blobs, support pre v2.1.0 clients
316296
if(numShards == 1) {
317297
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);

hollow/src/main/java/com/netflix/hollow/core/write/HollowSetTypeWriteState.java

+10-28
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ public HollowSetSchema getSchema() {
7070
return (HollowSetSchema)schema;
7171
}
7272

73-
public void prepareForWrite() {
73+
public void prepareForWrite(int numShards) {
7474
super.prepareForWrite();
7575

76-
gatherStatistics();
76+
gatherStatistics(numShards);
7777
}
7878

79-
private void gatherStatistics() {
79+
private void gatherStatistics(int numShards) {
8080
maxOrdinal = ordinalMap.maxOrdinal();
8181

8282
gatherShardingStats(maxOrdinal);
@@ -85,7 +85,7 @@ private void gatherStatistics() {
8585
int maxSetSize = 0;
8686
ByteData data = ordinalMap.getByteData().getUnderlyingArray();
8787

88-
totalOfSetBuckets = new long[numShards];
88+
totalOfSetBuckets = new long[numShards];// SNAP: TODO: called for delta and rev delta
8989

9090
for(int i=0;i<=maxOrdinal;i++) {
9191
if(currentCyclePopulated.get(i) || previousCyclePopulated.get(i)) {
@@ -303,26 +303,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE
303303
}
304304

305305
@Override
306-
public void calculateDelta() {
307-
calculateDelta(previousCyclePopulated, currentCyclePopulated);
308-
}
309-
310-
@Override
311-
public void writeDelta(DataOutputStream dos) throws IOException {
312-
writeCalculatedDelta(dos);
313-
}
314-
315-
@Override
316-
public void calculateReverseDelta() {
317-
calculateDelta(currentCyclePopulated, previousCyclePopulated);
318-
}
319-
320-
@Override
321-
public void writeReverseDelta(DataOutputStream dos) throws IOException {
322-
writeCalculatedDelta(dos);
323-
}
324-
325-
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
306+
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, int numShards) {
326307
maxOrdinal = ordinalMap.maxOrdinal();
327308
int bitsPerSetFixedLengthPortion = bitsPerSetSizeValue + bitsPerSetPointer;
328309

@@ -430,16 +411,17 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet
430411
}
431412
}
432413

433-
private void writeCalculatedDelta(DataOutputStream os) throws IOException {
414+
@Override
415+
public void writeCalculatedDelta(DataOutputStream os, int numShards, int[] maxShardOrdinal) throws IOException {
434416
/// for unsharded blobs, support pre v2.1.0 clients
435417
if(numShards == 1) {
436-
writeCalculatedDeltaShard(os, 0);
418+
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
437419
} else {
438420
/// overall max ordinal
439421
VarInt.writeVInt(os, maxOrdinal);
440422

441423
for(int i=0;i<numShards;i++) {
442-
writeCalculatedDeltaShard(os, i);
424+
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
443425
}
444426
}
445427

@@ -449,7 +431,7 @@ private void writeCalculatedDelta(DataOutputStream os) throws IOException {
449431
deltaRemovedOrdinals = null;
450432
}
451433

452-
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber) throws IOException {
434+
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal) throws IOException {
453435

454436
int bitsPerSetFixedLengthPortion = bitsPerSetSizeValue + bitsPerSetPointer;
455437

hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java

+24-9
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import com.netflix.hollow.core.write.copy.HollowRecordCopier;
3333
import java.io.DataOutputStream;
3434
import java.io.IOException;
35+
import java.util.Arrays;
3536
import java.util.BitSet;
37+
import java.util.logging.Level;
3638
import java.util.logging.Logger;
3739

3840
/**
@@ -64,6 +66,9 @@ public abstract class HollowTypeWriteState {
6466
private boolean wroteData = false;
6567

6668
private boolean isNumShardsPinned; // if numShards is pinned using numShards annotation in data model
69+
protected int maxShardOrdinal[];
70+
protected int revMaxShardOrdinal[];
71+
6772

6873
public HollowTypeWriteState(HollowSchema schema, int numShards) {
6974
this(schema, numShards, false);
@@ -320,14 +325,28 @@ public boolean isRestored() {
320325

321326
public abstract void writeSnapshot(DataOutputStream dos) throws IOException;
322327

323-
public abstract void calculateDelta();
328+
public void calculateDelta() {
329+
calculateDelta(previousCyclePopulated, currentCyclePopulated, numShards);
330+
}
324331

325-
public abstract void writeDelta(DataOutputStream dos) throws IOException;
332+
public void calculateReverseDelta() {
333+
calculateDelta(currentCyclePopulated, previousCyclePopulated, revNumShards);
334+
}
326335

327-
public abstract void calculateReverseDelta();
336+
public void writeDelta(DataOutputStream dos) throws IOException {
337+
LOG.log(Level.FINE, String.format("Writing delta with num shards = %s, max shard ordinals = %s", numShards, Arrays.toString(maxShardOrdinal)));
338+
writeCalculatedDelta(dos, numShards, maxShardOrdinal);
339+
}
340+
341+
public void writeReverseDelta(DataOutputStream dos) throws IOException {
342+
LOG.log(Level.FINE, String.format("Writing reversedelta with num shards = %s, max shard ordinals = %s", revNumShards, Arrays.toString(revMaxShardOrdinal)));
343+
writeCalculatedDelta(dos, revNumShards, revMaxShardOrdinal);
344+
}
345+
346+
public abstract void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, int numShards);
347+
348+
public abstract void writeCalculatedDelta(DataOutputStream os, int numShards, int[] maxShardOrdinal) throws IOException;
328349

329-
public abstract void writeReverseDelta(DataOutputStream dos) throws IOException;
330-
331350
protected void restoreFrom(HollowTypeReadState readState) {
332351
if(previousCyclePopulated.cardinality() != 0 || currentCyclePopulated.cardinality() != 0)
333352
throw new IllegalStateException("Attempting to restore into a non-empty state (type " + schema.getName() + ")");
@@ -411,10 +430,6 @@ public boolean allowTypeResharding() {
411430
return stateEngine.allowTypeResharding();
412431
}
413432

414-
protected int maxShardOrdinal[];
415-
protected int revMaxShardOrdinal[];
416-
417-
418433
protected void gatherShardingStats(int maxOrdinal) {
419434
if(numShards == -1) {
420435
numShards = typeStateNumShards(maxOrdinal);

hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -479,10 +479,9 @@ public void testReshardingAllTypes() {
479479

480480
}
481481

482-
// producer doesn't support resharding for these types yet
483-
assertEquals(8, producer.getWriteEngine().getTypeState("SetOfString").getNumShards());
484-
assertEquals(4, producer.getWriteEngine().getTypeState("ListOfInteger").getNumShards());
485-
assertEquals(8, producer.getWriteEngine().getTypeState("MapOfStringToLong").getNumShards());
482+
assertEquals(16, producer.getWriteEngine().getTypeState("SetOfString").getNumShards());
483+
assertEquals(8, producer.getWriteEngine().getTypeState("ListOfInteger").getNumShards());
484+
assertEquals(16, producer.getWriteEngine().getTypeState("MapOfStringToLong").getNumShards());
486485

487486
producer.runCycle(ws -> {
488487
// still same num shards, because ghost records

0 commit comments

Comments
 (0)