|
16 | 16 | */
|
17 | 17 | package com.netflix.hollow.core.write;
|
18 | 18 |
|
| 19 | +import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE; |
| 20 | + |
19 | 21 | import com.netflix.hollow.core.index.FieldPaths;
|
20 | 22 | import com.netflix.hollow.core.memory.ByteData;
|
21 | 23 | import com.netflix.hollow.core.memory.ByteDataArray;
|
|
30 | 32 | import java.util.logging.Level;
|
31 | 33 | import java.util.logging.Logger;
|
32 | 34 |
|
33 |
| -import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE; |
34 |
| - |
35 | 35 | public class HollowMapTypeWriteState extends HollowTypeWriteState {
|
36 | 36 | private static final Logger LOG = Logger.getLogger(HollowMapTypeWriteState.class.getName());
|
37 | 37 |
|
@@ -77,20 +77,32 @@ public void prepareForWrite() {
|
77 | 77 | }
|
78 | 78 |
|
79 | 79 | private void gatherStatistics() {
|
80 |
| - if(numShards == -1) |
81 |
| - calculateNumShards(); |
82 |
| - revNumShards = numShards; |
| 80 | + int maxOrdinal = ordinalMap.maxOrdinal(); |
| 81 | + if(numShards == -1) { |
| 82 | + numShards = calculateNumShards(maxOrdinal); |
| 83 | + revNumShards = numShards; |
| 84 | + } else { |
| 85 | + revNumShards = numShards; |
| 86 | + if (allowTypeResharding()) { |
| 87 | + numShards = calculateNumShards(maxOrdinal); |
| 88 | + if (numShards != revNumShards) { // re-sharding |
| 89 | + // limit numShards to 2x or .5x of prevShards per producer cycle |
| 90 | + numShards = numShards > revNumShards ? revNumShards * 2 : revNumShards / 2; |
| 91 | + |
| 92 | + LOG.info(String.format("Num shards for type %s changing from %s to %s", schema.getName(), revNumShards, numShards)); |
| 93 | + addReshardingHeader(revNumShards, numShards); // SNAP: TODO: Here, |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + maxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, numShards); |
| 99 | + if (revNumShards > 0) { |
| 100 | + revMaxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, revNumShards); |
| 101 | + } |
83 | 102 |
|
84 | 103 | int maxKeyOrdinal = 0;
|
85 | 104 | int maxValueOrdinal = 0;
|
86 | 105 |
|
87 |
| - int maxOrdinal = ordinalMap.maxOrdinal(); |
88 |
| - |
89 |
| - maxShardOrdinal = new int[numShards]; |
90 |
| - int minRecordLocationsPerShard = (maxOrdinal + 1) / numShards; |
91 |
| - for(int i=0;i<numShards;i++) |
92 |
| - maxShardOrdinal[i] = (i < ((maxOrdinal + 1) & (numShards - 1))) ? minRecordLocationsPerShard : minRecordLocationsPerShard - 1; |
93 |
| - |
94 | 106 | int maxMapSize = 0;
|
95 | 107 | ByteData data = ordinalMap.getByteData().getUnderlyingArray();
|
96 | 108 |
|
@@ -143,10 +155,9 @@ private void gatherStatistics() {
|
143 | 155 | bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfMapBuckets);
|
144 | 156 | }
|
145 | 157 |
|
146 |
| - private void calculateNumShards() { |
| 158 | + int calculateNumShards(int maxOrdinal) { |
147 | 159 | int maxKeyOrdinal = 0;
|
148 | 160 | int maxValueOrdinal = 0;
|
149 |
| - int maxOrdinal = ordinalMap.maxOrdinal(); |
150 | 161 |
|
151 | 162 | int maxMapSize = 0;
|
152 | 163 | ByteData data = ordinalMap.getByteData().getUnderlyingArray();
|
@@ -194,9 +205,10 @@ private void calculateNumShards() {
|
194 | 205 | long projectedSizeOfType = (bitsPerMapSizeValue + bitsPerMapPointer) * (maxOrdinal + 1) / 8;
|
195 | 206 | projectedSizeOfType += ((bitsPerKeyElement + bitsPerValueElement) * totalOfMapBuckets) / 8;
|
196 | 207 |
|
197 |
| - numShards = 1; |
198 |
| - while(stateEngine.getTargetMaxTypeShardSize() * numShards < projectedSizeOfType) |
199 |
| - numShards *= 2; |
| 208 | + int targetNumShards = 1; |
| 209 | + while(stateEngine.getTargetMaxTypeShardSize() * targetNumShards < projectedSizeOfType) |
| 210 | + targetNumShards *= 2; |
| 211 | + return targetNumShards; |
200 | 212 | }
|
201 | 213 |
|
202 | 214 | @Override
|
|
0 commit comments