From e51e0429063f3191bd6345b1b52cb0849601d63b Mon Sep 17 00:00:00 2001 From: Gabriel Fukushima Date: Tue, 9 Jan 2024 09:35:12 +1000 Subject: [PATCH] Add --X-trie-log subcommand (#6303) * Add x-trie-log subcommand for one-off trie log backlog prune Signed-off-by: Simon Dudley Signed-off-by: Gabriel Fukushima --------- Signed-off-by: Simon Dudley Signed-off-by: Gabriel Fukushima Co-authored-by: Simon Dudley --- .../storage/RocksDbUsageHelper.java | 10 +- .../storage/StorageSubCommand.java | 6 +- .../subcommands/storage/TrieLogHelper.java | 361 ++++++++++++++++++ .../storage/TrieLogSubCommand.java | 147 +++++++ .../besu/controller/BesuController.java | 18 +- .../controller/BesuControllerBuilder.java | 3 +- .../storage/TrieLogHelperTest.java | 265 +++++++++++++ .../BonsaiWorldStateKeyValueStorage.java | 2 +- .../trie/bonsai/trielog/TrieLogPruner.java | 17 +- 9 files changed, 814 insertions(+), 15 deletions(-) create mode 100644 besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelper.java create mode 100644 besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java create mode 100644 besu/src/test/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelperTest.java diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/RocksDbUsageHelper.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/RocksDbUsageHelper.java index 4bfd9f42951..4a11abcdf02 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/RocksDbUsageHelper.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/RocksDbUsageHelper.java @@ -34,17 +34,17 @@ static void printUsageForColumnFamily( final RocksDB rocksdb, final ColumnFamilyHandle cfHandle, final PrintWriter out) throws RocksDBException, NumberFormatException { final String size = rocksdb.getProperty(cfHandle, "rocksdb.estimate-live-data-size"); + final String numberOfKeys = rocksdb.getProperty(cfHandle, "rocksdb.estimate-num-keys"); boolean emptyColumnFamily = false; - if (!size.isEmpty() && !size.isBlank()) { + if (!size.isBlank() && !numberOfKeys.isBlank()) { try { final long sizeLong = Long.parseLong(size); + final long numberOfKeysLong = Long.parseLong(numberOfKeys); final String totalSstFilesSize = rocksdb.getProperty(cfHandle, "rocksdb.total-sst-files-size"); final long totalSstFilesSizeLong = - !totalSstFilesSize.isEmpty() && !totalSstFilesSize.isBlank() - ? Long.parseLong(totalSstFilesSize) - : 0; - if (sizeLong == 0) { + !totalSstFilesSize.isBlank() ? Long.parseLong(totalSstFilesSize) : 0; + if (sizeLong == 0 && numberOfKeysLong == 0) { emptyColumnFamily = true; } diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/StorageSubCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/StorageSubCommand.java index e46ffde2739..0dbfa3d22c5 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/StorageSubCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/StorageSubCommand.java @@ -45,7 +45,11 @@ description = "This command provides storage related actions.", mixinStandardHelpOptions = true, versionProvider = VersionProvider.class, - subcommands = {StorageSubCommand.RevertVariablesStorage.class, RocksDbSubCommand.class}) + subcommands = { + StorageSubCommand.RevertVariablesStorage.class, + RocksDbSubCommand.class, + TrieLogSubCommand.class + }) public class StorageSubCommand implements Runnable { /** The constant COMMAND_NAME. */ diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelper.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelper.java new file mode 100644 index 00000000000..52dbe559034 --- /dev/null +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelper.java @@ -0,0 +1,361 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.cli.subcommands.storage; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.hyperledger.besu.controller.BesuController.DATABASE_PATH; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.PrintWriter; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.tuweni.bytes.Bytes32; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Helper class for counting and pruning trie logs */ +public class TrieLogHelper { + private static final String TRIE_LOG_FILE = "trieLogsToRetain"; + private static final long BATCH_SIZE = 20_000; + private static final int ROCKSDB_MAX_INSERTS_PER_TRANSACTION = 1000; + private static final Logger LOG = LoggerFactory.getLogger(TrieLogHelper.class); + + static void prune( + final DataStorageConfiguration config, + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final MutableBlockchain blockchain, + final Path dataDirectoryPath) { + final String batchFileNameBase = + dataDirectoryPath.resolve(DATABASE_PATH).resolve(TRIE_LOG_FILE).toString(); + + validatePruneConfiguration(config); + + final long layersToRetain = config.getUnstable().getBonsaiTrieLogRetentionThreshold(); + + final long chainHeight = blockchain.getChainHeadBlockNumber(); + + final long lastBlockNumberToRetainTrieLogsFor = chainHeight - layersToRetain + 1; + + if (!validPruneRequirements(blockchain, chainHeight, lastBlockNumberToRetainTrieLogsFor)) { + return; + } + + final long numberOfBatches = calculateNumberofBatches(layersToRetain); + + processTrieLogBatches( + rootWorldStateStorage, + blockchain, + chainHeight, + lastBlockNumberToRetainTrieLogsFor, + numberOfBatches, + batchFileNameBase); + + if (rootWorldStateStorage.streamTrieLogKeys(layersToRetain).count() == layersToRetain) { + deleteFiles(batchFileNameBase, numberOfBatches); + LOG.info("Prune ran successfully. Enjoy some disk space back! \uD83D\uDE80"); + } else { + LOG.error("Prune failed. Re-run the subcommand to load the trie logs from file."); + } + } + + private static void processTrieLogBatches( + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final MutableBlockchain blockchain, + final long chainHeight, + final long lastBlockNumberToRetainTrieLogsFor, + final long numberOfBatches, + final String batchFileNameBase) { + + for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) { + + final long firstBlockOfBatch = chainHeight - ((batchNumber - 1) * BATCH_SIZE); + + final long lastBlockOfBatch = + Math.max(chainHeight - (batchNumber * BATCH_SIZE), lastBlockNumberToRetainTrieLogsFor); + + final List trieLogKeys = + getTrieLogKeysForBlocks(blockchain, firstBlockOfBatch, lastBlockOfBatch); + + saveTrieLogBatches(batchFileNameBase, rootWorldStateStorage, batchNumber, trieLogKeys); + } + + LOG.info("Clear trie logs..."); + rootWorldStateStorage.clearTrieLog(); + + for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) { + restoreTrieLogBatches(rootWorldStateStorage, batchNumber, batchFileNameBase); + } + } + + private static void saveTrieLogBatches( + final String batchFileNameBase, + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final long batchNumber, + final List trieLogKeys) { + + LOG.info("Saving trie logs to retain in file (batch {})...", batchNumber); + + try { + saveTrieLogsInFile(trieLogKeys, rootWorldStateStorage, batchNumber, batchFileNameBase); + } catch (IOException e) { + LOG.error("Error saving trie logs to file: {}", e.getMessage()); + throw new RuntimeException(e); + } + } + + private static void restoreTrieLogBatches( + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final long batchNumber, + final String batchFileNameBase) { + + try { + LOG.info("Restoring trie logs retained from batch {}...", batchNumber); + recreateTrieLogs(rootWorldStateStorage, batchNumber, batchFileNameBase); + } catch (IOException e) { + LOG.error("Error recreating trie logs from batch {}: {}", batchNumber, e.getMessage()); + throw new RuntimeException(e); + } + } + + private static void deleteFiles(final String batchFileNameBase, final long numberOfBatches) { + + LOG.info("Deleting files..."); + + for (long batchNumber = 1; batchNumber <= numberOfBatches; batchNumber++) { + File file = new File(batchFileNameBase + "-" + batchNumber); + if (file.exists()) { + file.delete(); + } + } + } + + private static List getTrieLogKeysForBlocks( + final MutableBlockchain blockchain, + final long firstBlockOfBatch, + final long lastBlockOfBatch) { + final List trieLogKeys = new ArrayList<>(); + for (long i = firstBlockOfBatch; i >= lastBlockOfBatch; i--) { + final Optional header = blockchain.getBlockHeader(i); + header.ifPresentOrElse( + blockHeader -> trieLogKeys.add(blockHeader.getHash()), + () -> LOG.error("Error retrieving block")); + } + return trieLogKeys; + } + + private static long calculateNumberofBatches(final long layersToRetain) { + return layersToRetain / BATCH_SIZE + ((layersToRetain % BATCH_SIZE == 0) ? 0 : 1); + } + + private static boolean validPruneRequirements( + final MutableBlockchain blockchain, + final long chainHeight, + final long lastBlockNumberToRetainTrieLogsFor) { + if (lastBlockNumberToRetainTrieLogsFor < 0) { + throw new IllegalArgumentException( + "Trying to retain more trie logs than chain length (" + + chainHeight + + "), skipping pruning"); + } + + final Optional finalizedBlockHash = blockchain.getFinalized(); + + if (finalizedBlockHash.isEmpty()) { + throw new RuntimeException("No finalized block present, can't safely run trie log prune"); + } else { + final Hash finalizedHash = finalizedBlockHash.get(); + final Optional finalizedBlockHeader = blockchain.getBlockHeader(finalizedHash); + if (finalizedBlockHeader.isPresent() + && finalizedBlockHeader.get().getNumber() < lastBlockNumberToRetainTrieLogsFor) { + throw new IllegalArgumentException( + "Trying to prune more layers than the finalized block height, skipping pruning"); + } + } + return true; + } + + private static void recreateTrieLogs( + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final long batchNumber, + final String batchFileNameBase) + throws IOException { + // process in chunk to avoid OOM + + IdentityHashMap trieLogsToRetain = + readTrieLogsFromFile(batchFileNameBase, batchNumber); + final int chunkSize = ROCKSDB_MAX_INSERTS_PER_TRANSACTION; + List keys = new ArrayList<>(trieLogsToRetain.keySet()); + + for (int startIndex = 0; startIndex < keys.size(); startIndex += chunkSize) { + processTransactionChunk(startIndex, chunkSize, keys, trieLogsToRetain, rootWorldStateStorage); + } + } + + private static void processTransactionChunk( + final int startIndex, + final int chunkSize, + final List keys, + final IdentityHashMap trieLogsToRetain, + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage) { + + var updater = rootWorldStateStorage.updater(); + int endIndex = Math.min(startIndex + chunkSize, keys.size()); + + for (int i = startIndex; i < endIndex; i++) { + byte[] key = keys.get(i); + byte[] value = trieLogsToRetain.get(key); + updater.getTrieLogStorageTransaction().put(key, value); + LOG.info("Key({}): {}", i, Bytes32.wrap(key).toShortHexString()); + } + + updater.getTrieLogStorageTransaction().commit(); + } + + private static void validatePruneConfiguration(final DataStorageConfiguration config) { + checkArgument( + config.getUnstable().getBonsaiTrieLogRetentionThreshold() + >= config.getBonsaiMaxLayersToLoad(), + String.format( + "--Xbonsai-trie-log-retention-threshold minimum value is %d", + config.getBonsaiMaxLayersToLoad())); + checkArgument( + config.getUnstable().getBonsaiTrieLogPruningLimit() > 0, + String.format( + "--Xbonsai-trie-log-pruning-limit=%d must be greater than 0", + config.getUnstable().getBonsaiTrieLogPruningLimit())); + checkArgument( + config.getUnstable().getBonsaiTrieLogPruningLimit() + > config.getUnstable().getBonsaiTrieLogRetentionThreshold(), + String.format( + "--Xbonsai-trie-log-pruning-limit=%d must greater than --Xbonsai-trie-log-retention-threshold=%d", + config.getUnstable().getBonsaiTrieLogPruningLimit(), + config.getUnstable().getBonsaiTrieLogRetentionThreshold())); + } + + private static void saveTrieLogsInFile( + final List trieLogsKeys, + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final long batchNumber, + final String batchFileNameBase) + throws IOException { + + File file = new File(batchFileNameBase + "-" + batchNumber); + if (file.exists()) { + LOG.error("File already exists, skipping file creation"); + return; + } + + try (FileOutputStream fos = new FileOutputStream(file)) { + ObjectOutputStream oos = new ObjectOutputStream(fos); + oos.writeObject(getTrieLogs(trieLogsKeys, rootWorldStateStorage)); + } catch (IOException e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + private static IdentityHashMap readTrieLogsFromFile( + final String batchFileNameBase, final long batchNumber) { + + IdentityHashMap trieLogs; + try (FileInputStream fis = new FileInputStream(batchFileNameBase + "-" + batchNumber); + ObjectInputStream ois = new ObjectInputStream(fis)) { + + trieLogs = (IdentityHashMap) ois.readObject(); + + } catch (IOException | ClassNotFoundException e) { + + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + + return trieLogs; + } + + private static IdentityHashMap getTrieLogs( + final List trieLogKeys, final BonsaiWorldStateKeyValueStorage rootWorldStateStorage) { + IdentityHashMap trieLogsToRetain = new IdentityHashMap<>(); + + LOG.info("Obtaining trielogs from db, this may take a few minutes..."); + trieLogKeys.forEach( + hash -> + rootWorldStateStorage + .getTrieLog(hash) + .ifPresent(trieLog -> trieLogsToRetain.put(hash.toArrayUnsafe(), trieLog))); + return trieLogsToRetain; + } + + static TrieLogCount getCount( + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + final int limit, + final Blockchain blockchain) { + final AtomicInteger total = new AtomicInteger(); + final AtomicInteger canonicalCount = new AtomicInteger(); + final AtomicInteger forkCount = new AtomicInteger(); + final AtomicInteger orphanCount = new AtomicInteger(); + rootWorldStateStorage + .streamTrieLogKeys(limit) + .map(Bytes32::wrap) + .map(Hash::wrap) + .forEach( + hash -> { + total.getAndIncrement(); + blockchain + .getBlockHeader(hash) + .ifPresentOrElse( + (header) -> { + long number = header.getNumber(); + final Optional headerByNumber = + blockchain.getBlockHeader(number); + if (headerByNumber.isPresent() + && headerByNumber.get().getHash().equals(hash)) { + canonicalCount.getAndIncrement(); + } else { + forkCount.getAndIncrement(); + } + }, + orphanCount::getAndIncrement); + }); + + return new TrieLogCount(total.get(), canonicalCount.get(), forkCount.get(), orphanCount.get()); + } + + static void printCount(final PrintWriter out, final TrieLogCount count) { + out.printf( + "trieLog count: %s\n - canonical count: %s\n - fork count: %s\n - orphaned count: %s\n", + count.total, count.canonicalCount, count.forkCount, count.orphanCount); + } + + record TrieLogCount(int total, int canonicalCount, int forkCount, int orphanCount) {} +} diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java new file mode 100644 index 00000000000..bf75fd6eb9a --- /dev/null +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java @@ -0,0 +1,147 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.cli.subcommands.storage; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.hyperledger.besu.cli.util.VersionProvider; +import org.hyperledger.besu.controller.BesuController; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.storage.StorageProvider; +import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner; +import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; +import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; + +import java.io.PrintWriter; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.ParentCommand; + +/** The Trie Log subcommand. */ +@Command( + name = "x-trie-log", + description = "Manipulate trie logs", + mixinStandardHelpOptions = true, + versionProvider = VersionProvider.class, + subcommands = {TrieLogSubCommand.CountTrieLog.class, TrieLogSubCommand.PruneTrieLog.class}) +public class TrieLogSubCommand implements Runnable { + + @SuppressWarnings("UnusedVariable") + @ParentCommand + private static StorageSubCommand parentCommand; + + @SuppressWarnings("unused") + @CommandLine.Spec + private CommandLine.Model.CommandSpec spec; // Picocli injects reference to command spec + + @Override + public void run() { + final PrintWriter out = spec.commandLine().getOut(); + spec.commandLine().usage(out); + } + + private static BesuController createBesuController() { + return parentCommand.parentCommand.buildController(); + } + + @Command( + name = "count", + description = "This command counts all the trie logs", + mixinStandardHelpOptions = true, + versionProvider = VersionProvider.class) + static class CountTrieLog implements Runnable { + + @SuppressWarnings("unused") + @ParentCommand + private TrieLogSubCommand parentCommand; + + @SuppressWarnings("unused") + @CommandLine.Spec + private CommandLine.Model.CommandSpec spec; // Picocli injects reference to command spec + + @Override + public void run() { + TrieLogContext context = getTrieLogContext(); + + final PrintWriter out = spec.commandLine().getOut(); + + out.println("Counting trie logs..."); + TrieLogHelper.printCount( + out, + TrieLogHelper.getCount( + context.rootWorldStateStorage, Integer.MAX_VALUE, context.blockchain)); + } + } + + @Command( + name = "prune", + description = + "This command prunes all trie log layers below the retention threshold, including orphaned trie logs.", + mixinStandardHelpOptions = true, + versionProvider = VersionProvider.class) + static class PruneTrieLog implements Runnable { + + @SuppressWarnings("unused") + @ParentCommand + private TrieLogSubCommand parentCommand; + + @SuppressWarnings("unused") + @CommandLine.Spec + private CommandLine.Model.CommandSpec spec; // Picocli injects reference to command spec + + @Override + public void run() { + TrieLogContext context = getTrieLogContext(); + final Path dataDirectoryPath = + Paths.get( + TrieLogSubCommand.parentCommand.parentCommand.dataDir().toAbsolutePath().toString()); + TrieLogHelper.prune( + context.config(), + context.rootWorldStateStorage(), + context.blockchain(), + dataDirectoryPath); + } + } + + record TrieLogContext( + DataStorageConfiguration config, + BonsaiWorldStateKeyValueStorage rootWorldStateStorage, + MutableBlockchain blockchain) {} + + private static TrieLogContext getTrieLogContext() { + Configurator.setLevel(LoggerFactory.getLogger(TrieLogPruner.class).getName(), Level.DEBUG); + checkNotNull(parentCommand); + BesuController besuController = createBesuController(); + final DataStorageConfiguration config = besuController.getDataStorageConfiguration(); + checkArgument( + DataStorageFormat.BONSAI.equals(config.getDataStorageFormat()), + "Subcommand only works with data-storage-format=BONSAI"); + + final StorageProvider storageProvider = besuController.getStorageProvider(); + final BonsaiWorldStateKeyValueStorage rootWorldStateStorage = + (BonsaiWorldStateKeyValueStorage) + storageProvider.createWorldStateStorage(DataStorageFormat.BONSAI); + final MutableBlockchain blockchain = besuController.getProtocolContext().getBlockchain(); + return new TrieLogContext(config, rootWorldStateStorage, blockchain); + } +} diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java index bb46003b24b..12efc4a9df5 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuController.java @@ -37,6 +37,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration; import org.hyperledger.besu.ethereum.storage.StorageProvider; +import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; import java.io.Closeable; import java.io.IOException; @@ -77,6 +78,7 @@ public class BesuController implements java.io.Closeable { private final SyncState syncState; private final EthPeers ethPeers; private final StorageProvider storageProvider; + private final DataStorageConfiguration dataStorageConfiguration; /** * Instantiates a new Besu controller. @@ -96,6 +98,9 @@ public class BesuController implements java.io.Closeable { * @param nodeKey the node key * @param closeables the closeables * @param additionalPluginServices the additional plugin services + * @param ethPeers the eth peers + * @param storageProvider the storage provider + * @param dataStorageConfiguration the data storage configuration */ BesuController( final ProtocolSchedule protocolSchedule, @@ -114,7 +119,8 @@ public class BesuController implements java.io.Closeable { final List closeables, final PluginServiceFactory additionalPluginServices, final EthPeers ethPeers, - final StorageProvider storageProvider) { + final StorageProvider storageProvider, + final DataStorageConfiguration dataStorageConfiguration) { this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethProtocolManager = ethProtocolManager; @@ -132,6 +138,7 @@ public class BesuController implements java.io.Closeable { this.additionalPluginServices = additionalPluginServices; this.ethPeers = ethPeers; this.storageProvider = storageProvider; + this.dataStorageConfiguration = dataStorageConfiguration; } /** @@ -293,6 +300,15 @@ public PluginServiceFactory getAdditionalPluginServices() { return additionalPluginServices; } + /** + * Gets data storage configuration. + * + * @return the data storage configuration + */ + public DataStorageConfiguration getDataStorageConfiguration() { + return dataStorageConfiguration; + } + /** The type Builder. */ public static class Builder { diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 92cbf320e84..6bcfea3cad8 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -803,7 +803,8 @@ public BesuController build() { closeables, additionalPluginServices, ethPeers, - storageProvider); + storageProvider, + dataStorageConfiguration); } /** diff --git a/besu/src/test/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelperTest.java b/besu/src/test/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelperTest.java new file mode 100644 index 00000000000..82659701b1c --- /dev/null +++ b/besu/src/test/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogHelperTest.java @@ -0,0 +1,265 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.cli.subcommands.storage; + +import static org.hyperledger.besu.ethereum.worldstate.DataStorageFormat.BONSAI; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.storage.StorageProvider; +import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; +import org.hyperledger.besu.ethereum.worldstate.ImmutableDataStorageConfiguration; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TrieLogHelperTest { + + private static final StorageProvider storageProvider = new InMemoryKeyValueStorageProvider(); + private static BonsaiWorldStateKeyValueStorage inMemoryWorldState; + + @Mock private MutableBlockchain blockchain; + + @TempDir static Path dataDir; + + Path test; + static BlockHeader blockHeader1; + static BlockHeader blockHeader2; + static BlockHeader blockHeader3; + static BlockHeader blockHeader4; + static BlockHeader blockHeader5; + + @BeforeAll + public static void setup() throws IOException { + + blockHeader1 = new BlockHeaderTestFixture().number(1).buildHeader(); + blockHeader2 = new BlockHeaderTestFixture().number(2).buildHeader(); + blockHeader3 = new BlockHeaderTestFixture().number(3).buildHeader(); + blockHeader4 = new BlockHeaderTestFixture().number(4).buildHeader(); + blockHeader5 = new BlockHeaderTestFixture().number(5).buildHeader(); + + inMemoryWorldState = + new BonsaiWorldStateKeyValueStorage(storageProvider, new NoOpMetricsSystem()); + + var updater = inMemoryWorldState.updater(); + updater + .getTrieLogStorageTransaction() + .put(blockHeader1.getHash().toArrayUnsafe(), Bytes.fromHexString("0x01").toArrayUnsafe()); + updater + .getTrieLogStorageTransaction() + .put(blockHeader2.getHash().toArrayUnsafe(), Bytes.fromHexString("0x02").toArrayUnsafe()); + updater + .getTrieLogStorageTransaction() + .put(blockHeader3.getHash().toArrayUnsafe(), Bytes.fromHexString("0x03").toArrayUnsafe()); + updater + .getTrieLogStorageTransaction() + .put(blockHeader4.getHash().toArrayUnsafe(), Bytes.fromHexString("0x04").toArrayUnsafe()); + updater + .getTrieLogStorageTransaction() + .put(blockHeader5.getHash().toArrayUnsafe(), Bytes.fromHexString("0x05").toArrayUnsafe()); + updater.getTrieLogStorageTransaction().commit(); + } + + @BeforeEach + void createDirectory() throws IOException { + Files.createDirectories(dataDir.resolve("database")); + } + + @AfterEach + void deleteDirectory() throws IOException { + Files.deleteIfExists(dataDir.resolve("database")); + } + + void mockBlockchainBase() { + when(blockchain.getChainHeadBlockNumber()).thenReturn(5L); + when(blockchain.getFinalized()).thenReturn(Optional.of(blockHeader3.getBlockHash())); + when(blockchain.getBlockHeader(any(Hash.class))).thenReturn(Optional.of(blockHeader3)); + } + + @Test + public void prune() { + + DataStorageConfiguration dataStorageConfiguration = + ImmutableDataStorageConfiguration.builder() + .dataStorageFormat(BONSAI) + .bonsaiMaxLayersToLoad(2L) + .unstable( + ImmutableDataStorageConfiguration.Unstable.builder() + .bonsaiTrieLogRetentionThreshold(3) + .build() + .withBonsaiTrieLogRetentionThreshold(3)) + .build(); + + mockBlockchainBase(); + when(blockchain.getBlockHeader(5)).thenReturn(Optional.of(blockHeader5)); + when(blockchain.getBlockHeader(4)).thenReturn(Optional.of(blockHeader4)); + when(blockchain.getBlockHeader(3)).thenReturn(Optional.of(blockHeader3)); + + // assert trie logs that will be pruned exist before prune call + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader1.getHash()).get(), + Bytes.fromHexString("0x01").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader2.getHash()).get(), + Bytes.fromHexString("0x02").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader3.getHash()).get(), + Bytes.fromHexString("0x03").toArrayUnsafe()); + + TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir); + + // assert pruned trie logs are not in the DB + assertEquals(inMemoryWorldState.getTrieLog(blockHeader1.getHash()), Optional.empty()); + assertEquals(inMemoryWorldState.getTrieLog(blockHeader2.getHash()), Optional.empty()); + + // assert retained trie logs are in the DB + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader3.getHash()).get(), + Bytes.fromHexString("0x03").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader4.getHash()).get(), + Bytes.fromHexString("0x04").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader5.getHash()).get(), + Bytes.fromHexString("0x05").toArrayUnsafe()); + } + + @Test + public void cantPruneIfNoFinalizedIsFound() { + DataStorageConfiguration dataStorageConfiguration = + ImmutableDataStorageConfiguration.builder() + .dataStorageFormat(BONSAI) + .bonsaiMaxLayersToLoad(2L) + .unstable( + ImmutableDataStorageConfiguration.Unstable.builder() + .bonsaiTrieLogRetentionThreshold(2) + .build() + .withBonsaiTrieLogRetentionThreshold(2)) + .build(); + + when(blockchain.getChainHeadBlockNumber()).thenReturn(5L); + when(blockchain.getFinalized()).thenReturn(Optional.empty()); + + assertThrows( + RuntimeException.class, + () -> + TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir)); + } + + @Test + public void cantPruneIfUserRetainsMoreLayerThanExistingChainLength() { + DataStorageConfiguration dataStorageConfiguration = + ImmutableDataStorageConfiguration.builder() + .dataStorageFormat(BONSAI) + .bonsaiMaxLayersToLoad(2L) + .unstable( + ImmutableDataStorageConfiguration.Unstable.builder() + .bonsaiTrieLogRetentionThreshold(10) + .build() + .withBonsaiTrieLogRetentionThreshold(10)) + .build(); + + when(blockchain.getChainHeadBlockNumber()).thenReturn(5L); + + assertThrows( + IllegalArgumentException.class, + () -> + TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir)); + } + + @Test + public void cantPruneIfUserRequiredFurtherThanFinalized() { + + DataStorageConfiguration dataStorageConfiguration = + ImmutableDataStorageConfiguration.builder() + .dataStorageFormat(BONSAI) + .bonsaiMaxLayersToLoad(2L) + .unstable( + ImmutableDataStorageConfiguration.Unstable.builder() + .bonsaiTrieLogRetentionThreshold(2) + .build() + .withBonsaiTrieLogRetentionThreshold(2)) + .build(); + + mockBlockchainBase(); + + assertThrows( + IllegalArgumentException.class, + () -> + TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir)); + } + + @Test + public void exceptionWhileSavingFileStopsPruneProcess() throws IOException { + Files.delete(dataDir.resolve("database")); + + DataStorageConfiguration dataStorageConfiguration = + ImmutableDataStorageConfiguration.builder() + .dataStorageFormat(BONSAI) + .bonsaiMaxLayersToLoad(2L) + .unstable( + ImmutableDataStorageConfiguration.Unstable.builder() + .bonsaiTrieLogRetentionThreshold(2) + .build() + .withBonsaiTrieLogRetentionThreshold(2)) + .build(); + + assertThrows( + RuntimeException.class, + () -> + TrieLogHelper.prune(dataStorageConfiguration, inMemoryWorldState, blockchain, dataDir)); + + // assert all trie logs are still in the DB + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader1.getHash()).get(), + Bytes.fromHexString("0x01").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader2.getHash()).get(), + Bytes.fromHexString("0x02").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader3.getHash()).get(), + Bytes.fromHexString("0x03").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader4.getHash()).get(), + Bytes.fromHexString("0x04").toArrayUnsafe()); + assertArrayEquals( + inMemoryWorldState.getTrieLog(blockHeader5.getHash()).get(), + Bytes.fromHexString("0x05").toArrayUnsafe()); + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/storage/BonsaiWorldStateKeyValueStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/storage/BonsaiWorldStateKeyValueStorage.java index 54089c7e212..97384fd6c12 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/storage/BonsaiWorldStateKeyValueStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/storage/BonsaiWorldStateKeyValueStorage.java @@ -204,7 +204,7 @@ public Optional getTrieLog(final Hash blockHash) { return trieLogStorage.get(blockHash.toArrayUnsafe()); } - public Stream streamTrieLogKeys(final int limit) { + public Stream streamTrieLogKeys(final long limit) { return trieLogStorage.streamKeys().limit(limit); } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPruner.java index 747a82e1621..b1bf75818e9 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/bonsai/trielog/TrieLogPruner.java @@ -61,33 +61,37 @@ public TrieLogPruner( this.requireFinalizedBlock = requireFinalizedBlock; } - public void initialize() { - preloadQueue(); + public int initialize() { + return preloadQueue(); } - private void preloadQueue() { + private int preloadQueue() { LOG.atInfo() .setMessage("Loading first {} trie logs from database...") .addArgument(loadingLimit) .log(); try (final Stream trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) { final AtomicLong count = new AtomicLong(); + final AtomicLong orphansPruned = new AtomicLong(); trieLogKeys.forEach( blockHashAsBytes -> { final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes)); final Optional header = blockchain.getBlockHeader(blockHash); if (header.isPresent()) { - trieLogBlocksAndForksByDescendingBlockNumber.put(header.get().getNumber(), blockHash); + addToPruneQueue(header.get().getNumber(), blockHash); count.getAndIncrement(); } else { // prune orphaned blocks (sometimes created during block production) rootWorldStateStorage.pruneTrieLog(blockHash); + orphansPruned.getAndIncrement(); } }); + LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue()); LOG.atInfo().log("Loaded {} trie logs from database", count); - pruneFromQueue(); + return pruneFromQueue() + orphansPruned.intValue(); } catch (Exception e) { LOG.error("Error loading trie logs from database, nothing pruned", e); + return 0; } } @@ -176,8 +180,9 @@ private NoOpTrieLogPruner( } @Override - public void initialize() { + public int initialize() { // no-op + return -1; } @Override