From bbeeabd37aa0809cd914f4dcc01489d7277a021f Mon Sep 17 00:00:00 2001 From: zurdoron <51879220+zurdoron@users.noreply.github.com> Date: Wed, 6 May 2020 11:54:17 +0300 Subject: [PATCH 1/9] expose read states in producer --- .../netflix/hollow/api/producer/AbstractHollowProducer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 9f5e9338d5..9c24051408 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -796,7 +796,10 @@ private void validate(ProducerListeners listeners, HollowProducer.ReadState read } } - + public ReadStateHelper getHelper(){ + return readStates; + } + private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState); From ad57fa00eb654b1ebca9cf9073b1b55ac3143c30 Mon Sep 17 00:00:00 2001 From: zurdoron <51879220+zurdoron@users.noreply.github.com> Date: Wed, 6 May 2020 11:55:32 +0300 Subject: [PATCH 2/9] expose current in read state helper --- .../java/com/netflix/hollow/api/producer/ReadStateHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java b/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java index 57e1993e4b..41667beaf7 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java @@ -29,7 +29,7 @@ * @author Tim Taylor {@literal<tim@toolbear.io>} * */ -final class ReadStateHelper { +final public class ReadStateHelper { static ReadStateHelper newDeltaChain() { return new ReadStateHelper(null, null); } @@ -86,7 +86,7 @@ ReadStateHelper rollback() { return new ReadStateHelper(newReadState(current.getVersion(), pending.getStateEngine()), null); } - ReadState current() { + public ReadState current() { return current; } From 6b6873ba1bd6abbbf1b462cb9ca0e6fc16ac38ab Mon Sep 17 00:00:00 2001 From: zurdoron <51879220+zurdoron@users.noreply.github.com> Date: Wed, 6 May 2020 12:02:08 +0300 Subject: [PATCH 3/9] parallel enhancements for high cardinality types --- .../hollow/core/memory/ByteArrayOrdinalMap.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java index 4ddeaa42e9..fbb4e19afe 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java +++ b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java @@ -371,7 +371,7 @@ public void compact(ThreadSafeBitSet usedOrdinals) { } } - Arrays.sort(populatedReverseKeys); + Arrays.parallelSort(populatedReverseKeys); SegmentedByteArray arr = byteData.getUnderlyingArray(); long currentCopyPointer = 0; @@ -543,19 +543,18 @@ private void populateNewHashArray(AtomicLongArray newKeys, long[] valuesToAdd, i int modBitmask = newKeys.length() - 1; - for (int i = 0; i < length; i++) { + IntStream.range(0,length).parallel().forEach(i->{ long value = valuesToAdd[i]; if (value != EMPTY_BUCKET_VALUE) { int hash = rehashPreviouslyAddedData(value); int bucket = hash & modBitmask; - while (newKeys.get(bucket) != EMPTY_BUCKET_VALUE) { + boolean isSet = false; + while(!isSet){ + isSet = newKeys.compareAndSet(bucket,EMPTY_BUCKET_VALUE,value); bucket = (bucket + 1) & modBitmask; } - // Volatile store not required, could use plain store - // See VarHandles for JDK >= 9 - newKeys.lazySet(bucket, value); } - } + }); } /** @@ -603,4 +602,4 @@ public static int getOrdinal(long pointerAndOrdinal) { return (int) (pointerAndOrdinal >>> BITS_PER_POINTER); } -} \ No newline at end of file +} From f166ed9c783442989457645985ebf41dbfffb675 Mon Sep 17 00:00:00 2001 From: zurdoron <51879220+zurdoron@users.noreply.github.com> Date: Wed, 6 May 2020 12:14:33 +0300 Subject: [PATCH 4/9] add import --- .../java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java index fbb4e19afe..0eb11cad81 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java +++ b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.concurrent.atomic.AtomicLongArray; +import java.util.stream.IntStream; /** * This data structure maps byte sequences to ordinals. This is a hash table. From cc741f019cbdf2ab9d3bb1774ff8f6ae0e05a0a6 Mon Sep 17 00:00:00 2001 From: "zur.do@moonactive.com" <zur.do@moonactive.com> Date: Sat, 25 Jul 2020 17:58:04 +0300 Subject: [PATCH 5/9] parallelism enhancements for high cardinality types --- .../api/producer/AbstractHollowProducer.java | 4 -- .../hollow/api/producer/ReadStateHelper.java | 4 +- .../core/memory/ByteArrayOrdinalMap.java | 37 +++++++++++++------ 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 9c24051408..9c1e548989 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -796,10 +796,6 @@ private void validate(ProducerListeners listeners, HollowProducer.ReadState read } } - public ReadStateHelper getHelper(){ - return readStates; - } - private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState); diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java b/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java index 41667beaf7..57e1993e4b 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/ReadStateHelper.java @@ -29,7 +29,7 @@ * @author Tim Taylor {@literal<tim@toolbear.io>} * */ -final public class ReadStateHelper { +final class ReadStateHelper { static ReadStateHelper newDeltaChain() { return new ReadStateHelper(null, null); } @@ -86,7 +86,7 @@ ReadStateHelper rollback() { return new ReadStateHelper(newReadState(current.getVersion(), pending.getStateEngine()), null); } - public ReadState current() { + ReadState current() { return current; } diff --git a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java index 0eb11cad81..2c1e4ea081 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java +++ b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java @@ -16,11 +16,16 @@ */ package com.netflix.hollow.core.memory; +import com.netflix.hollow.api.error.HollowException; import com.netflix.hollow.core.memory.encoding.HashCodes; import com.netflix.hollow.core.memory.encoding.VarInt; import com.netflix.hollow.core.memory.pool.WastefulRecycler; +import com.netflix.hollow.core.util.SimultaneousExecutor; + import java.util.Arrays; import java.util.BitSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLongArray; import java.util.stream.IntStream; @@ -544,18 +549,26 @@ private void populateNewHashArray(AtomicLongArray newKeys, long[] valuesToAdd, i int modBitmask = newKeys.length() - 1; - IntStream.range(0,length).parallel().forEach(i->{ - long value = valuesToAdd[i]; - if (value != EMPTY_BUCKET_VALUE) { - int hash = rehashPreviouslyAddedData(value); - int bucket = hash & modBitmask; - boolean isSet = false; - while(!isSet){ - isSet = newKeys.compareAndSet(bucket,EMPTY_BUCKET_VALUE,value); - bucket = (bucket + 1) & modBitmask; - } - } - }); + ForkJoinPool forkJoinPool = new ForkJoinPool(); + + try { + forkJoinPool.submit(()->{ + IntStream.range(0,length).parallel().forEach(i->{ + long value = valuesToAdd[i]; + if (value != EMPTY_BUCKET_VALUE) { + int hash = rehashPreviouslyAddedData(value); + int bucket = hash & modBitmask; + boolean isSet = false; + while(!isSet){ + isSet = newKeys.compareAndSet(bucket,EMPTY_BUCKET_VALUE,value); + bucket = (bucket + 1) & modBitmask; + } + } + }); + }).get(); + } catch (Exception e) { + throw new HollowException("Unable to populate byte array map", e); + } } /** From 466f4abcd98103c306926bf4c4cdcb759f37f606 Mon Sep 17 00:00:00 2001 From: "zur.do@moonactive.com" <zur.do@moonactive.com> Date: Sat, 25 Jul 2020 18:07:49 +0300 Subject: [PATCH 6/9] remove unused imports --- .../com/netflix/hollow/api/producer/AbstractHollowProducer.java | 1 + .../com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 9c1e548989..7d4a22976b 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -796,6 +796,7 @@ private void validate(ProducerListeners listeners, HollowProducer.ReadState read } } + private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState); diff --git a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java index 2c1e4ea081..7e6e6c3c5f 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java +++ b/hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java @@ -20,11 +20,9 @@ import com.netflix.hollow.core.memory.encoding.HashCodes; import com.netflix.hollow.core.memory.encoding.VarInt; import com.netflix.hollow.core.memory.pool.WastefulRecycler; -import com.netflix.hollow.core.util.SimultaneousExecutor; import java.util.Arrays; import java.util.BitSet; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLongArray; import java.util.stream.IntStream; From 68cb89c0b56099f76bec3afe3012dea1fce5aa0d Mon Sep 17 00:00:00 2001 From: "zur.do@moonactive.com" <zur.do@moonactive.com> Date: Sat, 25 Jul 2020 18:09:21 +0300 Subject: [PATCH 7/9] revert --- .../hollow/api/producer/AbstractHollowProducer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 7d4a22976b..61c9127c49 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -375,9 +375,9 @@ long runCycle( ReadStateHelper candidate = readStates.roundtrip(toVersion); cycleStatus.readState(candidate.pending()); - candidate = doIntegrityCheck ? + candidate = doIntegrityCheck ? checkIntegrity(listeners, candidate, artifacts, schemaChangedFromPriorVersion) : - noIntegrityCheck(candidate, artifacts); + noIntegrityCheck(candidate, artifacts); try { validate(listeners, candidate.pending()); @@ -728,11 +728,11 @@ private ReadStateHelper checkIntegrity( listeners.fireIntegrityCheckComplete(status); } } - + private ReadStateHelper noIntegrityCheck(ReadStateHelper readStates, Artifacts artifacts) throws IOException { ReadStateHelper result = readStates; - if(!readStates.hasCurrent() || + if(!readStates.hasCurrent() || (!readStates.current().getStateEngine().hasIdenticalSchemas(getWriteEngine()) && artifacts.snapshot != null)) { HollowReadStateEngine pending = readStates.pending().getStateEngine(); readSnapshot(artifacts.snapshot, pending); @@ -745,7 +745,7 @@ private ReadStateHelper noIntegrityCheck(ReadStateHelper readStates, Artifacts a } applyDelta(artifacts.delta, current); - + result = readStates.swap(); } } @@ -796,7 +796,7 @@ private void validate(ProducerListeners listeners, HollowProducer.ReadState read } } - + private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState); From 93111b924285acf1208219fece6b96a04560737d Mon Sep 17 00:00:00 2001 From: "zur.do@moonactive.com" <zur.do@moonactive.com> Date: Mon, 27 Jul 2020 15:16:06 +0300 Subject: [PATCH 8/9] Revert "revert" This reverts commit 68cb89c0 --- .../hollow/api/producer/AbstractHollowProducer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 61c9127c49..7d4a22976b 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -375,9 +375,9 @@ long runCycle( ReadStateHelper candidate = readStates.roundtrip(toVersion); cycleStatus.readState(candidate.pending()); - candidate = doIntegrityCheck ? + candidate = doIntegrityCheck ? checkIntegrity(listeners, candidate, artifacts, schemaChangedFromPriorVersion) : - noIntegrityCheck(candidate, artifacts); + noIntegrityCheck(candidate, artifacts); try { validate(listeners, candidate.pending()); @@ -728,11 +728,11 @@ private ReadStateHelper checkIntegrity( listeners.fireIntegrityCheckComplete(status); } } - + private ReadStateHelper noIntegrityCheck(ReadStateHelper readStates, Artifacts artifacts) throws IOException { ReadStateHelper result = readStates; - if(!readStates.hasCurrent() || + if(!readStates.hasCurrent() || (!readStates.current().getStateEngine().hasIdenticalSchemas(getWriteEngine()) && artifacts.snapshot != null)) { HollowReadStateEngine pending = readStates.pending().getStateEngine(); readSnapshot(artifacts.snapshot, pending); @@ -745,7 +745,7 @@ private ReadStateHelper noIntegrityCheck(ReadStateHelper readStates, Artifacts a } applyDelta(artifacts.delta, current); - + result = readStates.swap(); } } @@ -796,7 +796,7 @@ private void validate(ProducerListeners listeners, HollowProducer.ReadState read } } - + private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState); From 7b5ade372a97e6a523035ddac6e0fbba867a1327 Mon Sep 17 00:00:00 2001 From: Sunjeet <sstattla@yahoo.com> Date: Fri, 7 Feb 2020 00:50:59 +0200 Subject: [PATCH 9/9] Producer Listener support refactored for code reuse (#448) * Producer Listener support refactored * Address PR feedback (cherry picked from commit b080244ad3255c0de4e015c8415c7b1c50736f66) --- .../com/netflix/hollow/api/producer/AbstractHollowProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 7d4a22976b..9f5e9338d5 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -796,7 +796,7 @@ private void validate(ProducerListeners listeners, HollowProducer.ReadState read } } - + private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState);