Skip to content

Commit

Permalink
feat: 15448: In-memory virtual maps support (#15575)
Browse files Browse the repository at this point in the history
Fixes: #15448
Reviewed-by: Anthony Petrov <[email protected]>, Ivan Malygin <[email protected]>, Oleg Mazurov <[email protected]>
Signed-off-by: Artem Ananev <[email protected]>
  • Loading branch information
artemananiev authored Dec 3, 2024
1 parent 8eecfe6 commit c0dfdfc
Show file tree
Hide file tree
Showing 17 changed files with 920 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@State(Scope.Thread)
@Warmup(iterations = 1)
@Measurement(iterations = 5)
public abstract class VirtualMapBench extends VirtualMapBaseBench {
public class VirtualMapBench extends VirtualMapBaseBench {

String benchmarkName() {
return "VirtualMapBench";
Expand Down Expand Up @@ -261,4 +261,66 @@ public void read() throws Exception {

afterTest(true);
}

@Benchmark
public void queueMode() throws Exception {
beforeTest("queueMode");

final long[] map = new long[verify ? maxKey : 0];
VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap = createMap(map);

final int roundsPerCopy = maxKey / numFiles;
for (int i = 0; i < maxKey; i++) {
// Add
int index = i;
final BenchmarkKey keyToAdd = new BenchmarkKey(index);
long val = nextValue();
virtualMap.put(keyToAdd, new BenchmarkValue(val));
if (verify) {
map[index] = val;
}
// Update
if (i >= numRecords / 2) {
index = i - numRecords / 2;
final BenchmarkKey keyToUpdate = new BenchmarkKey(index);
val = nextValue();
virtualMap.put(keyToUpdate, new BenchmarkValue(val));
if (verify) {
map[index] = val;
}
}
// Remove
if (i >= numRecords) {
index = i - numRecords;
final BenchmarkKey keyToRemove = new BenchmarkKey(index);
virtualMap.remove(keyToRemove);
if (verify) {
map[index] = 0;
}
}

if (i % roundsPerCopy == 0) {
virtualMap = copyMap(virtualMap);
}
}

// Ensure the map is done with hashing/merging/flushing
final var finalMap = flushMap(virtualMap);

verifyMap(map, finalMap);

afterTest(true, () -> {
finalMap.release();
finalMap.getDataSource().close();
});
}

public static void main(String[] args) throws Exception {
final VirtualMapBench bench = new VirtualMapBench();
bench.setup();
bench.beforeTest();
bench.queueMode();
bench.afterTest();
bench.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.swirlds.common.config.singleton.ConfigurationHolder;
import com.swirlds.common.constructable.ClassConstructorPair;
import com.swirlds.common.constructable.ConstructableRegistry;
import com.swirlds.common.constructable.ConstructableRegistryException;
Expand All @@ -37,12 +38,15 @@
import com.swirlds.common.merkle.MerkleNode;
import com.swirlds.common.merkle.crypto.MerkleCryptoFactory;
import com.swirlds.common.merkle.route.MerkleRoute;
import com.swirlds.config.api.Configuration;
import com.swirlds.config.extensions.test.fixtures.TestConfigBuilder;
import com.swirlds.merkledb.config.MerkleDbConfig;
import com.swirlds.merkledb.test.fixtures.ExampleFixedSizeVirtualValue;
import com.swirlds.merkledb.test.fixtures.ExampleFixedSizeVirtualValueSerializer;
import com.swirlds.merkledb.test.fixtures.ExampleLongKeyFixedSize;
import com.swirlds.virtualmap.VirtualMap;
import com.swirlds.virtualmap.config.VirtualMapConfig;
import com.swirlds.virtualmap.config.VirtualMapConfig_;
import com.swirlds.virtualmap.datasource.VirtualDataSourceBuilder;
import com.swirlds.virtualmap.internal.cache.VirtualNodeCache;
import com.swirlds.virtualmap.internal.merkle.VirtualInternalNode;
Expand Down Expand Up @@ -377,4 +381,85 @@ void serializeFlushedAndUnflushedData(final int count) throws InterruptedExcepti

MILLISECONDS.sleep(100); // Hack. Release methods may not have finished their work yet.
}

@Test
void inMemoryModeSerde() throws IOException {
final Configuration configuration = new TestConfigBuilder()
.withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1_000_000)
.getOrCreateConfig();
ConfigurationHolder.getInstance().setConfiguration(configuration);

VirtualMap<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> map = new VirtualMap<>(
"inMemoryModeSerde", KEY_SERIALIZER, VALUE_SERIALIZER, constructBuilder(), configuration);

// Copy 0
for (int i = 0; i < 100; i++) {
final ExampleLongKeyFixedSize key = new ExampleLongKeyFixedSize(i);
final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + i);
map.put(key, value);
}

// Copy 1
final VirtualMap<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> copy1 = map.copy();
map.release();
map = copy1;
for (int i = 100; i < 200; i++) {
final ExampleLongKeyFixedSize key = new ExampleLongKeyFixedSize(i);
final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + i);
map.put(key, value);
}
// Add more entries to copy 1 to force it to flush
for (int i = 100000; i < 120000; i++) {
final ExampleLongKeyFixedSize key = new ExampleLongKeyFixedSize(i);
final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + i);
map.put(key, value);
}

final int nCopies = 100;
for (int copyNo = 2; copyNo < nCopies; copyNo++) {
final VirtualMap<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> copy = map.copy();
map.release();
map = copy;
for (int i = 0; i < 100; i++) {
final int toAdd = copyNo * 100 + i;
final ExampleLongKeyFixedSize keyToAdd = new ExampleLongKeyFixedSize(toAdd);
final ExampleFixedSizeVirtualValue value = new ExampleFixedSizeVirtualValue(1000000 + toAdd);
map.put(keyToAdd, value);
final int toRemove = (copyNo - 2) * 100 + i + 75;
final ExampleLongKeyFixedSize keytoRemove = new ExampleLongKeyFixedSize(toRemove);
final ExampleFixedSizeVirtualValue removed = map.remove(keytoRemove);
assertNotNull(removed);
}
}

// Final copy
final VirtualMap<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> copyF = map.copy();
map.release();
map = copyF;

// And one more to make sure copyF is immutable and can be serialized
final VirtualMap<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> copyOneMore = map.copy();

final Hash originalHash = MerkleCryptoFactory.getInstance().digestTreeSync(copyF);

final ByteArrayOutputStream bout = new ByteArrayOutputStream();
final Path tmp = LegacyTemporaryFileBuilder.buildTemporaryDirectory("inMemoryModeSerde", configuration);
try (final SerializableDataOutputStream out = new SerializableDataOutputStream(bout)) {
copyF.serialize(out, tmp);
}

final ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
map = new VirtualMap<>(configuration);
try (final SerializableDataInputStream in = new SerializableDataInputStream(bin)) {
map.deserialize(in, tmp, 3);
}

final VirtualMap<ExampleLongKeyFixedSize, ExampleFixedSizeVirtualValue> copyAfter = map.copy();

final Hash restoredHash = MerkleCryptoFactory.getInstance().digestTreeSync(map);
assertEquals(originalHash, restoredHash);

copyOneMore.release();
copyAfter.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void buildATree() {
// hashes are calculated and put to the cache. Here the cache doesn't contain hashes for dirty leaves
// (bananaLeaf0, appleLeaf0, cherryLeaf0). Should dirtyHashes() include these leaf nodes? Currently
// it doesn't
cache0.prepareForFlush();
validateDirtyInternals(Set.of(rootInternal0, leftInternal0), cache0.dirtyHashesForFlush(4));

// ROUND 1: Add D and E.
Expand Down Expand Up @@ -173,6 +174,7 @@ void buildATree() {
dateLeaf1,
appleLeaf1,
eggplantLeaf1));
cache1.prepareForFlush();
validateDirtyInternals(
Set.of(rootInternal1, leftInternal1, rightInternal1, leftLeftInternal1), cache1.dirtyHashesForFlush(8));

Expand Down Expand Up @@ -244,6 +246,7 @@ void buildATree() {
figLeaf2,
bananaLeaf2,
grapeLeaf2));
cache2.prepareForFlush();
validateDirtyInternals(
Set.of(rootInternal2, leftInternal2, rightInternal2, leftRightInternal2, rightLeftInternal2),
cache2.dirtyHashesForFlush(12));
Expand Down Expand Up @@ -327,6 +330,7 @@ void buildATree() {
cache3.putHash(leftInternal3);
cache3.putHash(rootInternal3);
cache3.seal();
cache3.prepareForFlush();
validateDirtyInternals(
Set.of(
rootInternal3,
Expand Down Expand Up @@ -608,6 +612,7 @@ void dirtyLeaves_allInSameVersionAllDeleted() {
cache.deleteLeaf(cherryLeaf(1));
cache.seal();

cache.prepareForFlush();
final List<VirtualLeafRecord<TestKey, TestValue>> leaves =
cache.dirtyLeavesForFlush(-1, -1).toList();
assertEquals(0, leaves.size(), "All leaves should be missing");
Expand Down Expand Up @@ -672,6 +677,7 @@ void dirtyLeaves_differentVersionsNoneDeleted() {
cache0.merge();
cache1.merge();

cache2.prepareForFlush();
final Set<VirtualLeafRecord<TestKey, TestValue>> leaves =
cache2.dirtyLeavesForFlush(4, 8).collect(Collectors.toSet());
assertEquals(5, leaves.size(), "All leaves should be dirty");
Expand Down Expand Up @@ -711,6 +717,7 @@ void dirtyLeaves_differentVersionsSomeDeleted() {
cache0.merge();
cache1.merge();

cache2.prepareForFlush();
final Set<VirtualLeafRecord<TestKey, TestValue>> leaves =
cache2.dirtyLeavesForFlush(3, 6).collect(Collectors.toSet());
assertEquals(4, leaves.size(), "Some leaves should be dirty");
Expand Down Expand Up @@ -751,6 +758,7 @@ void dirtyLeaves_differentVersionsAllDeleted() {
cache0.merge();
cache1.merge();

cache2.prepareForFlush();
final List<VirtualLeafRecord<TestKey, TestValue>> leaves =
cache2.dirtyLeavesForFlush(-1, -1).toList();
assertEquals(0, leaves.size(), "All leaves should be deleted");
Expand All @@ -771,6 +779,7 @@ void dirtyInternals_allInSameVersionNoneDeleted() {
cache0.putHash(rightLeftInternal());
cache0.seal();

cache0.prepareForFlush();
final List<VirtualHashRecord> internals = cache0.dirtyHashesForFlush(12).toList();
assertEquals(6, internals.size(), "All internals should be dirty");
assertEquals(rootInternal(), internals.get(0), "Unexpected internal");
Expand Down Expand Up @@ -798,6 +807,7 @@ void dirtyInternals_differentVersionsNoneDeleted() {
cache1.seal();
cache0.merge();

cache1.prepareForFlush();
final List<VirtualHashRecord> internals = cache1.dirtyHashesForFlush(12).toList();
assertEquals(6, internals.size(), "All internals should be dirty");
assertEquals(
Expand Down Expand Up @@ -841,6 +851,7 @@ void dirtyInternals_differentVersionsSomeDeleted() {
cache0.merge();
cache1.merge();

cache2.prepareForFlush();
final List<VirtualHashRecord> internals = cache2.dirtyHashesForFlush(12).toList();
assertEquals(6, internals.size(), "All internals should be dirty");
assertEquals(
Expand Down Expand Up @@ -886,6 +897,7 @@ void dirtyInternals_differentVersionsAllDeleted() {
cache0.merge();
cache1.merge();

cache2.prepareForFlush();
final List<VirtualHashRecord> internals = cache2.dirtyHashesForFlush(-1).toList();
assertEquals(0, internals.size(), "No internals should be dirty");
}
Expand Down Expand Up @@ -917,6 +929,7 @@ void dirtyLeaves_flushesAndHashing() {
cache1.dirtyLeavesForHash(2, 4).toList();
assertEquals(List.of(appleLeaf(3), cherryLeaf(4)), dirtyLeaves1);

cache0.prepareForFlush();
// Flush version 0
final Set<VirtualLeafRecord<TestKey, TestValue>> dirtyLeaves0F =
cache0.dirtyLeavesForFlush(1, 2).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.swirlds.virtualmap.internal.cache; // NOSONAR: Needed to benchmark internal classes

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
Expand All @@ -27,6 +28,7 @@
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
Expand All @@ -38,19 +40,34 @@
@Warmup(iterations = 1, time = 10)
@Measurement(iterations = 10, time = 15)
public class ConcurrentArrayBench {
private static final Random RANDOM = new Random();

private static final Random RANDOM = new Random(12341);
private static final int MIN_THREADS = 2;
private static final int DEFAULT_ARRAY_SIZE = 1000;

@Param({"1000000"})
public int size;

private ConcurrentArray<Long> concurrentArray;
private ExecutorService executor;

// Used in the benchmark to create concurrent arrays from streams
private long[] source;

@Setup(Level.Trial)
public void setupInfrastructure() {
int numCores = Runtime.getRuntime().availableProcessors();
executor = Executors.newFixedThreadPool(Math.max(MIN_THREADS, numCores - 1));
}

@Setup(Level.Trial)
public void setupSourceArray() {
source = new long[size];
for (int i = 0; i < size; i++) {
source[i] = RANDOM.nextLong();
}
}

@TearDown(Level.Trial)
public void tearDown() {
executor.shutdownNow();
Expand All @@ -65,6 +82,11 @@ public void setupPerIteration() {
}
}

@Benchmark
public void benchmarkCreateFromStream() {
concurrentArray = new ConcurrentArray<>(Arrays.stream(source).boxed());
}

@Benchmark
public void benchmarkGet() {
concurrentArray.get(RANDOM.nextInt(DEFAULT_ARRAY_SIZE));
Expand Down
Loading

0 comments on commit c0dfdfc

Please sign in to comment.