Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QBFT fast recovery by frequent multicasting of RC message #8397

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,42 @@ public BesuNode createQbftNode(
return create(builder.build());
}

public BesuNode createQbftNodeWithExtraCliOptions(
final String name,
final boolean fixedPort,
final DataStorageFormat storageFormat,
final List<String> extraCliOptions)
throws IOException {
JsonRpcConfiguration rpcConfig = node.createJsonRpcWithQbftEnabledConfig(false);
rpcConfig.addRpcApi("ADMIN,TXPOOL");
if (fixedPort) {
rpcConfig.setPort(
Math.abs(name.hashCode() % 60000)
+ 1024); // Generate a consistent port for p2p based on node name
}
BesuNodeConfigurationBuilder builder =
new BesuNodeConfigurationBuilder()
.name(name)
.miningEnabled()
.devMode(false)
.jsonRpcConfiguration(rpcConfig)
.webSocketConfiguration(node.createWebSocketEnabledConfig())
.dataStorageConfiguration(
storageFormat == DataStorageFormat.FOREST
? DataStorageConfiguration.DEFAULT_FOREST_CONFIG
: DataStorageConfiguration.DEFAULT_BONSAI_CONFIG)
.genesisConfigProvider(GenesisConfigurationFactory::createQbftGenesisConfig)
.extraCLIOptions(extraCliOptions);
if (fixedPort) {
builder.p2pPort(
Math.abs(name.hashCode() % 60000)
+ 1024
+ 500); // Generate a consistent port for p2p based on node name (+ 500 to avoid
// clashing with RPC port or other nodes with a similar name)
}
return create(builder.build());
}

public BesuNode createQbftMigrationNode(
final String name, final boolean fixedPort, final DataStorageFormat storageFormat)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.tests.acceptance.bft;

import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase;
import org.hyperledger.besu.tests.acceptance.dsl.node.BesuNode;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class BftRecoveryAcceptanceTest extends AcceptanceTestBase {

private static final long THREE_MINUTES = Duration.of(3, ChronoUnit.MINUTES).toMillis();
private static final long TEN_SECONDS = Duration.of(10, ChronoUnit.SECONDS).toMillis();

@Test
public void shouldBeRecoverFast() throws Exception {

List<String> extraCLIOptions =
List.of("--Xqbft-fast-recovery", "--Xqbft-enable-early-round-change");

// Create a mix of Bonsai and Forest DB nodes
final BesuNode minerNode1 =
besu.createQbftNodeWithExtraCliOptions(
"miner1", true, DataStorageFormat.BONSAI, extraCLIOptions);
final BesuNode minerNode2 =
besu.createQbftNodeWithExtraCliOptions(
"miner2", true, DataStorageFormat.BONSAI, extraCLIOptions);
final BesuNode minerNode3 =
besu.createQbftNodeWithExtraCliOptions(
"miner3", true, DataStorageFormat.BONSAI, extraCLIOptions);
final BesuNode minerNode4 =
besu.createQbftNodeWithExtraCliOptions(
"miner4", true, DataStorageFormat.BONSAI, extraCLIOptions);

cluster.start(minerNode1, minerNode2, minerNode3, minerNode4);

cluster.verify(blockchain.reachesHeight(minerNode1, 20, 85));

stopNode(minerNode3);
stopNode(minerNode4);

/* Let the other two nodes go to round 5 */
Thread.sleep(THREE_MINUTES);

startNode(minerNode3);
startNode(minerNode4);

/* Mining should start in few seconds */
cluster.verify(blockchain.reachesHeight(minerNode3, 1, 20));
}

// Start a node with a delay before returning to give it time to start
private void startNode(final BesuNode node) throws InterruptedException {
cluster.startNode(node);
Thread.sleep(TEN_SECONDS);
}

// Stop a node with a delay before returning to give it time to stop
private void stopNode(final BesuNode node) throws InterruptedException {
cluster.stopNode(node);
Thread.sleep(TEN_SECONDS);
}

@AfterEach
@Override
public void tearDownAcceptanceTestBase() {
cluster.stop();
super.tearDownAcceptanceTestBase();
}
}
3 changes: 2 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@
import org.hyperledger.besu.cli.options.PermissionsOptions;
import org.hyperledger.besu.cli.options.PluginsConfigurationOptions;
import org.hyperledger.besu.cli.options.PrivacyPluginOptions;
import org.hyperledger.besu.cli.options.QBFTOptions;
import org.hyperledger.besu.cli.options.RPCOptions;
import org.hyperledger.besu.cli.options.RpcWebsocketOptions;
import org.hyperledger.besu.cli.options.SynchronizerOptions;
import org.hyperledger.besu.cli.options.TransactionPoolOptions;
import org.hyperledger.besu.cli.options.storage.DataStorageOptions;
import org.hyperledger.besu.cli.options.storage.DiffBasedSubStorageOptions;
import org.hyperledger.besu.cli.options.unstable.QBFTOptions;
import org.hyperledger.besu.cli.presynctasks.PreSynchronizationTaskRunner;
import org.hyperledger.besu.cli.presynctasks.PrivateDatabaseMigrationPreSyncTask;
import org.hyperledger.besu.cli.subcommands.PasswordSubCommand;
Expand Down Expand Up @@ -1793,6 +1793,7 @@ public BesuControllerBuilder setupControllerBuilder() {
.isRevertReasonEnabled(isRevertReasonEnabled)
.storageProvider(storageProvider)
.isEarlyRoundChangeEnabled(unstableQbftOptions.isEarlyRoundChangeEnabled())
.isFastRecoveryEnabled(unstableQbftOptions.isFastRecoveryEnabled())
.requiredBlocks(requiredBlocks)
.reorgLoggingThreshold(reorgLoggingThreshold)
.evmConfiguration(unstableEvmOptions.toDomainObject())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.cli.options.unstable;
package org.hyperledger.besu.cli.options;

import picocli.CommandLine;

Expand All @@ -38,6 +38,12 @@ public static QBFTOptions create() {
hidden = true)
private boolean enableEarlyRoundChange = false;

@CommandLine.Option(
names = {"--Xqbft-fast-recovery"},
description = "Enable fast recovery mode for QBFT consensus (experimental)",
hidden = true)
private boolean enableFastRecovery = false;

/**
* Is early round change enabled boolean.
*
Expand All @@ -46,4 +52,13 @@ public static QBFTOptions create() {
public boolean isEarlyRoundChangeEnabled() {
return enableEarlyRoundChange;
}

/**
* Is fast recovery enabled boolean.
*
* @return true if fast recovery is enabled
*/
public boolean isFastRecoveryEnabled() {
return enableFastRecovery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
/** When enabled, round changes on f+1 RC messages from higher rounds */
protected boolean isEarlyRoundChangeEnabled = false;

/** When enabled, qbft recovery for nodes coming up is faster */
protected boolean isFastRecoveryEnabled = false;

/** Instantiates a new Besu controller builder. */
protected BesuControllerBuilder() {}

Expand Down Expand Up @@ -552,6 +555,17 @@ public BesuControllerBuilder isEarlyRoundChangeEnabled(final boolean isEarlyRoun
return this;
}

/**
* set fast recovery enabled for QBFT consensus
*
* @param fastRecoveryEnabled whether to enable fast recovery
* @return the besu controller
*/
public BesuControllerBuilder isFastRecoveryEnabled(final boolean fastRecoveryEnabled) {
this.isFastRecoveryEnabled = fastRecoveryEnabled;
return this;
}

/**
* Build besu controller.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import org.hyperledger.besu.consensus.common.bft.BlockTimer;
import org.hyperledger.besu.consensus.common.bft.EthSynchronizerUpdater;
import org.hyperledger.besu.consensus.common.bft.EventMultiplexer;
import org.hyperledger.besu.consensus.common.bft.FrequentMessageMulticaster;
import org.hyperledger.besu.consensus.common.bft.MessageTracker;
import org.hyperledger.besu.consensus.common.bft.RoundTimer;
import org.hyperledger.besu.consensus.common.bft.UniqueMessageMulticaster;
import org.hyperledger.besu.consensus.common.bft.blockcreation.BftMiningCoordinator;
import org.hyperledger.besu.consensus.common.bft.blockcreation.ProposerSelector;
import org.hyperledger.besu.consensus.common.bft.network.ValidatorMulticaster;
import org.hyperledger.besu.consensus.common.bft.network.ValidatorPeers;
import org.hyperledger.besu.consensus.common.bft.protocol.BftProtocolManager;
import org.hyperledger.besu.consensus.common.bft.statemachine.BftEventHandler;
Expand Down Expand Up @@ -242,6 +244,7 @@ protected MiningCoordinator createMiningCoordinator(
// "only send once" filter applied by the UniqueMessageMulticaster.
peers = new ValidatorPeers(validatorProvider, Istanbul100SubProtocol.NAME);

final ValidatorMulticaster frequentRCMulticaster = new FrequentMessageMulticaster(peers, 5000);
final UniqueMessageMulticaster uniqueMessageMulticaster =
new UniqueMessageMulticaster(peers, qbftConfig.getGossipedHistoryLimit());

Expand All @@ -260,7 +263,10 @@ protected MiningCoordinator createMiningCoordinator(
bftExecutors),
new BlockTimer(bftEventQueue, qbftForksSchedule, bftExecutors, clock),
new QbftBlockCreatorFactoryAdaptor(blockCreatorFactory, qbftExtraDataCodec),
clock);
clock,
isEarlyRoundChangeEnabled,
isFastRecoveryEnabled,
frequentRCMulticaster);

final MessageValidatorFactory messageValidatorFactory =
new MessageValidatorFactory(proposerSelector, qbftProtocolSchedule, qbftProtocolContext);
Expand Down Expand Up @@ -301,8 +307,6 @@ protected MiningCoordinator createMiningCoordinator(
new QbftValidatorModeTransitionLoggerAdaptor(
new ValidatorModeTransitionLogger(qbftForksSchedule)));

qbftBlockHeightManagerFactory.isEarlyRoundChangeEnabled(isEarlyRoundChangeEnabled);

final QbftEventHandler qbftController =
new QbftController(
new QbftBlockchainAdaptor(blockchain),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public void initMocks() throws Exception {
when(mockControllerBuilder.isParallelTxProcessingEnabled(false))
.thenReturn(mockControllerBuilder);
when(mockControllerBuilder.isEarlyRoundChangeEnabled(false)).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.isFastRecoveryEnabled(false)).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.storageProvider(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.requiredBlocks(any())).thenReturn(mockControllerBuilder);
when(mockControllerBuilder.reorgLoggingThreshold(anyLong())).thenReturn(mockControllerBuilder);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.common.bft;

import org.hyperledger.besu.consensus.common.bft.network.ValidatorMulticaster;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The Frequent message multicaster. */
public class FrequentMessageMulticaster implements ValidatorMulticaster {
private final ValidatorMulticaster multicaster;
private final long interval;
private final ScheduledExecutorService scheduledExecutorService;
private volatile ScheduledFuture<?> scheduledTask;
private static final Logger LOG = LoggerFactory.getLogger(FrequentMessageMulticaster.class);

/**
* Constructor that specifies the interval at which to multicast messages
*
* @param multicaster Network connections to the remote validators
* @param multicastInterval Interval at which to multicast messages
*/
public FrequentMessageMulticaster(
final ValidatorMulticaster multicaster, final long multicastInterval) {
this.multicaster = multicaster;
this.interval = multicastInterval;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}

/**
* Instantiates a new Frequent message multicaster.
*
* @param multicaster the multicaster
*/
@VisibleForTesting
public FrequentMessageMulticaster(final ValidatorMulticaster multicaster) {
this(multicaster, 5000);
}

private Runnable createPeriodicMulticastTask(
final MessageData message, final Collection<Address> denylist) {
return () -> {
LOG.debug(
"Broadcasting round change every {} ms on thread {}",
interval,
Thread.currentThread().threadId());
multicaster.send(message, denylist);
};
}

@Override
public synchronized void send(final MessageData message) {
send(message, Collections.emptyList());
}

@Override
public synchronized void send(final MessageData message, final Collection<Address> denylist) {
// Cancel the existing task before scheduling a new one
stopFrequentMulticasting();

// Define the periodic multicast task
final Runnable periodicMulticast = createPeriodicMulticastTask(message, denylist);

// Schedule the periodic task
scheduledTask =
scheduledExecutorService.scheduleAtFixedRate(
periodicMulticast, 0, interval, TimeUnit.MILLISECONDS);

LOG.debug("Scheduled new frequent multicast task for message.");
}

/**
* Stop frequent multicasting.
*
* <p>Stops the current scheduled task.
*/
public synchronized void stopFrequentMulticasting() {
if (scheduledTask != null) {
LOG.debug("Cancelling existing frequent multicast task.");
scheduledTask.cancel(true);
scheduledTask = null;
}
}
}
Loading