Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new BlockWriter implementation for block-as-file #355

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a73f4f9
WIP adding BlockWriter conditional creation and placeholder logic
ata-nas Nov 19, 2024
2d82213
WIP adding more creation logic, cleanup required after seein what wil…
ata-nas Nov 19, 2024
827db48
WIP minor cleanup
ata-nas Nov 19, 2024
c30b391
WIP introducing PathResolver
ata-nas Nov 19, 2024
9fa5c10
WIP adding storage type env var configuration support
ata-nas Nov 19, 2024
5c0a707
WIP simple writing of block files to the fs, need to handle cases and…
ata-nas Nov 20, 2024
6126411
WIP proof of concept write blocks as file to disk
ata-nas Nov 20, 2024
945f514
WIP abstract writer becomes local writer interface
ata-nas Nov 21, 2024
be28196
WIP some refactor and additions
ata-nas Nov 21, 2024
927c5ec
WIP javadoc improvements and cleanup
ata-nas Nov 25, 2024
d1094d5
WIP smoke tests use BLOCK_AS_DIR
ata-nas Nov 25, 2024
5f5a435
Merge branch 'main' into 281-new-blockwriter-implementation-for-block…
ata-nas Nov 26, 2024
2c4fc8b
WIP additional changes after merge
ata-nas Nov 26, 2024
5daabfb
WIP address some pr comments
ata-nas Nov 26, 2024
d869521
WIP address some pr comments, new live and archive roots for block pe…
ata-nas Nov 27, 2024
b6bbcea
WIP address some pr comments, renaming a parameter for better semantics
ata-nas Nov 27, 2024
93b6101
WIP address some pr comments, removers now created via static methods
ata-nas Nov 27, 2024
856b5c1
WIP address some pr comments, path resolvers now created via static m…
ata-nas Nov 27, 2024
b2601c6
WIP address some pr comments, local dir reader now created via static…
ata-nas Nov 27, 2024
699dfa8
WIP address some pr comments, local file and noop readers now created…
ata-nas Nov 27, 2024
edc6c93
WIP address some pr comments, local file and noop writers now created…
ata-nas Nov 27, 2024
a9a8f24
WIP address some pr comments, local dir writer now created via static…
ata-nas Nov 27, 2024
87c3a97
Merge branch 'main' into 281-new-blockwriter-implementation-for-block…
ata-nas Nov 27, 2024
3c8139d
WIP perms are now internal detail
ata-nas Nov 28, 2024
6e1090b
Merge branch 'main' into 281-new-blockwriter-implementation-for-block…
ata-nas Nov 28, 2024
9fc3cff
WIP spotless
ata-nas Nov 28, 2024
3f4b669
WIP fix all tests but 1
ata-nas Nov 28, 2024
ad0eec3
WIP fix test
ata-nas Nov 28, 2024
b15f7df
WIP block as dir currently default, for now
ata-nas Nov 28, 2024
e2a09ae
WIP fix an e2e test
ata-nas Nov 28, 2024
cda9403
WIP spotless
ata-nas Nov 28, 2024
abd9196
WIP cleanup
ata-nas Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,65 @@ public static int requirePositive(final int toCheck) {
return requirePositive(toCheck, null);
}

/**
* This method asserts a given integer is a positive. An integer is positive
* if it is NOT equal to zero and is greater than zero.
*
* @param toCheck the integer to check if it is a positive power of two
* @param errorMessage the error message to be used in the exception if the
* input integer to check is not positive, if null, a default message will
* be used
* @return the number to check if it is positive
* @throws IllegalArgumentException if the input number to check is not
* positive
*/
public static int requirePositive(final int toCheck, final String errorMessage) {
if (0 >= toCheck) {
final String message = Objects.isNull(errorMessage)
? "The input integer [%d] is required be positive.".formatted(toCheck)
: errorMessage;
throw new IllegalArgumentException(message);
} else {
return toCheck;
}
}

/**
* This method asserts a given long is a positive. A long is positive
* if it is NOT equal to zero and is greater than zero.
*
* @param toCheck the long to check if it is a positive power of two
* @return the long to check if it is positive
* @throws IllegalArgumentException if the input long to check is not
* positive
*/
public static long requirePositive(final long toCheck) {
return requirePositive(toCheck, null);
}

/**
* This method asserts a given long is a positive. A long is positive
* if it is NOT equal to zero and is greater than zero.
*
* @param toCheck the long to check if it is a positive power of two
* @param errorMessage the error message to be used in the exception if the
* input long to check is not positive, if null, a default message will
* be used
* @return the long to check if it is positive
* @throws IllegalArgumentException if the input long to check is not
* positive
*/
public static long requirePositive(final long toCheck, final String errorMessage) {
if (0L >= toCheck) {
final String message = Objects.isNull(errorMessage)
? "The input long [%d] is required be positive.".formatted(toCheck)
: errorMessage;
throw new IllegalArgumentException(message);
} else {
return toCheck;
}
}

/**
* This method asserts a given long is a whole number. A long is whole
* if it is greater or equal to zero.
Expand Down Expand Up @@ -103,34 +162,11 @@ public static long requireWhole(final long toCheck) {
public static long requireWhole(final long toCheck, final String errorMessage) {
if (toCheck >= 0) {
return toCheck;
}

final String message = Objects.isNull(errorMessage)
? "The input integer [%d] is required be whole.".formatted(toCheck)
: errorMessage;
throw new IllegalArgumentException(message);
}

/**
* This method asserts a given integer is a positive. An integer is positive
* if it is NOT equal to zero and is greater than zero.
*
* @param toCheck the integer to check if it is a positive power of two
* @param errorMessage the error message to be used in the exception if the
* input integer to check is not positive, if null, a default message will
* be used
* @return the number to check if it is positive
* @throws IllegalArgumentException if the input number to check is not
* positive
*/
public static int requirePositive(final int toCheck, final String errorMessage) {
if (0 >= toCheck) {
} else {
final String message = Objects.isNull(errorMessage)
? "The input integer [%d] is required be positive.".formatted(toCheck)
? "The input integer [%d] is required be whole.".formatted(toCheck)
: errorMessage;
throw new IllegalArgumentException(message);
} else {
return toCheck;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,44 @@ void testRequirePositiveFail(final int toTest) {
.withMessage(testErrorMessage);
}

/**
* This test aims to verify that the
* {@link Preconditions#requirePositive(long)} will return the input 'toTest'
* parameter if the positive check passes. Test includes overloads.
*
* @param toTest parameterized, the number to test
*/
@ParameterizedTest
@MethodSource("com.hedera.block.common.CommonsTestUtility#positiveIntegers")
void testRequirePositiveLongPass(final long toTest) {
final Consumer<Long> asserts = actual -> assertThat(actual).isPositive().isEqualTo(toTest);

final long actual = Preconditions.requirePositive(toTest);
assertThat(actual).satisfies(asserts);

final long actualOverload = Preconditions.requirePositive(toTest, "test error message");
assertThat(actualOverload).satisfies(asserts);
}

/**
* This test aims to verify that the
* {@link Preconditions#requirePositive(long)} will throw an
* {@link IllegalArgumentException} if the positive check fails. Test
* includes overloads.
*
* @param toTest parameterized, the number to test
*/
@ParameterizedTest
@MethodSource("com.hedera.block.common.CommonsTestUtility#zeroAndNegativeIntegers")
void testRequirePositiveLongFail(final long toTest) {
assertThatIllegalArgumentException().isThrownBy(() -> Preconditions.requirePositive(toTest));

final String testErrorMessage = "test error message";
assertThatIllegalArgumentException()
.isThrownBy(() -> Preconditions.requirePositive(toTest, testErrorMessage))
.withMessage(testErrorMessage);
}

/**
* This test aims to verify that the
* {@link Preconditions#requirePowerOfTwo(int)} will return the input
Expand Down
2 changes: 2 additions & 0 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ mainModuleInfo {
testModuleInfo {
annotationProcessor("dagger.compiler")
requires("org.junit.jupiter.api")
requires("org.junit.jupiter.params")
requires("org.mockito")
requires("org.mockito.junit.jupiter")
requires("org.assertj.core")
requiresStatic("com.github.spotbugs.annotations")
}

Expand Down
2 changes: 1 addition & 1 deletion server/docker/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ io.helidon.common.level=INFO
# Configure specific Block Node loggers
#com.hedera.block.server.producer.ProducerBlockItemObserver.level=FINE
#com.hedera.block.server.mediator.LiveStreamMediatorImpl.level=FINE
#com.hedera.block.server.persistence.storage.write.BlockAsDirWriter.level=FINE
#com.hedera.block.server.persistence.storage.write.BlockAsLocalDirWriter.level=FINE
#com.hedera.block.server.consumer.ConsumerStreamResponseObserver.level=FINE
#com.hedera.block.server.pbj.PbjBlockStreamServiceProxy.level=FINE

Expand Down
2 changes: 2 additions & 0 deletions server/docker/update-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ fi

if [ true = "$is_smoke_test" ]; then
# add smoke test variables
# @todo(#372) - default persistence type should be BLOCK_AS_LOCAL_FILE
echo "PERSISTENCE_STORAGE_TYPE=BLOCK_AS_LOCAL_DIRECTORY" >> .env
echo "MEDIATOR_RING_BUFFER_SIZE=1024" >> .env
echo "NOTIFIER_RING_BUFFER_SIZE=1024" >> .env
echo "JAVA_OPTS='-Xms4G -Xmx4G'" >> .env
Expand Down
20 changes: 11 additions & 9 deletions server/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ The default configuration allows users to quickly get up and running without hav
ease of use at the trade-off of some insecure default configuration. Most configuration settings have appropriate
defaults and can be left unchanged. It is recommended to browse the properties below and adjust to your needs.

| Environment Variable | Description | Default Value |
|:----------------------------------|:--------------------------------------------------------------------------------------------------------------|:--------------|
| PERSISTENCE_STORAGE_ROOT_PATH | The root path for the storage, if not provided will attempt to create a `data` on the working dir of the app. | |
| CONSUMER_TIMEOUT_THRESHOLD_MILLIS | Time to wait for subscribers before disconnecting in milliseconds | 1500 |
| SERVICE_DELAY_MILLIS | Service shutdown delay in milliseconds | 500 |
| MEDIATOR_RING_BUFFER_SIZE | Size of the ring buffer used by the mediator (must be a power of 2) | 67108864 |
| NOTIFIER_RING_BUFFER_SIZE | Size of the ring buffer used by the notifier (must be a power of 2) | 2048 |
| SERVER_PORT | The port the server will listen on | 8080 |
| SERVER_MAX_MESSAGE_SIZE_BYTES | The maximum size of a message frame in bytes | 1048576 |
| Environment Variable | Description | Default Value |
|:---|:---|---:|
| PERSISTENCE_STORAGE_LIVE_ROOT_PATH | The root path for the live storage. The provided path must be absolute! | |
| PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH | The root path for the archive storage. The provided path must be absolute! | |
| PERSISTENCE_STORAGE_TYPE | Type of the persistence storage | BLOCK_AS_LOCAL_FILE |
| CONSUMER_TIMEOUT_THRESHOLD_MILLIS | Time to wait for subscribers before disconnecting in milliseconds | 1500 |
| SERVICE_DELAY_MILLIS | Service shutdown delay in milliseconds | 500 |
| MEDIATOR_RING_BUFFER_SIZE | Size of the ring buffer used by the mediator (must be a power of 2) | 67108864 |
| NOTIFIER_RING_BUFFER_SIZE | Size of the ring buffer used by the notifier (must be a power of 2) | 2048 |
| SERVER_PORT | The port the server will listen on | 8080 |
| SERVER_MAX_MESSAGE_SIZE_BYTES | The maximum size of a message frame in bytes | 1048576 |
7 changes: 5 additions & 2 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

/** Constants used in the BlockNode service. */
public final class Constants {
/** Constant mapped to the semantic name of the Block Node root directory */
public static final String BLOCK_NODE_ROOT_DIRECTORY_SEMANTIC_NAME = "Block Node Root Directory";
/** Constant mapped to the semantic name of the Block Node live root directory */
public static final String BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME = "Block Node Live Root Directory";

/** Constant mapped to the semantic name of the Block Node archive root directory */
public static final String BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME = "Block Node Archive Root Directory";

/** Constant mapped to PbjProtocolProvider.CONFIG_NAME in the PBJ Helidon Plugin */
public static final String PBJ_PROTOCOL_PROVIDER_CONFIG_NAME = "pbj";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
public final class ServerMappedConfigSourceInitializer {
private static final List<ConfigMapping> MAPPINGS = List.of(
new ConfigMapping("consumer.timeoutThresholdMillis", "CONSUMER_TIMEOUT_THRESHOLD_MILLIS"),
new ConfigMapping("persistence.storage.rootPath", "PERSISTENCE_STORAGE_ROOT_PATH"),
new ConfigMapping("persistence.storage.liveRootPath", "PERSISTENCE_STORAGE_LIVE_ROOT_PATH"),
new ConfigMapping("persistence.storage.archiveRootPath", "PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH"),
new ConfigMapping("persistence.storage.type", "PERSISTENCE_STORAGE_TYPE"),
new ConfigMapping("service.delayMillis", "SERVICE_DELAY_MILLIS"),
new ConfigMapping("mediator.ringBufferSize", "MEDIATOR_RING_BUFFER_SIZE"),
new ConfigMapping("notifier.ringBufferSize", "NOTIFIER_RING_BUFFER_SIZE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public interface MetricsService {
* @param key to get a specific counter
* @return the counter
*/
Counter get(@NonNull BlockNodeMetricTypes.Counter key);
Counter get(@NonNull final BlockNodeMetricTypes.Counter key);

/**
* Use this method to get a specific gauge for the given metric type.
*
* @param key to get a specific gauge
* @return the gauge
*/
LongGauge get(@NonNull BlockNodeMetricTypes.Gauge key);
LongGauge get(@NonNull final BlockNodeMetricTypes.Gauge key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.swirlds.metrics.api.Metrics;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.EnumMap;
import java.util.Objects;
import javax.inject.Inject;

/**
Expand All @@ -29,10 +30,8 @@
* <p>Metrics are updated by calling the appropriate method on the metric object instance. For
* example, to increment a counter, call {@link Counter#increment()}.
*/
public class MetricsServiceImpl implements MetricsService {

public final class MetricsServiceImpl implements MetricsService {
private static final String CATEGORY = "hedera_block_node";

private final EnumMap<BlockNodeMetricTypes.Counter, Counter> counters =
new EnumMap<>(BlockNodeMetricTypes.Counter.class);
private final EnumMap<BlockNodeMetricTypes.Gauge, LongGauge> gauges =
Expand All @@ -45,22 +44,21 @@ public class MetricsServiceImpl implements MetricsService {
*/
@Inject
public MetricsServiceImpl(@NonNull final Metrics metrics) {
Objects.requireNonNull(metrics);
// Initialize the counters
for (BlockNodeMetricTypes.Counter counter : BlockNodeMetricTypes.Counter.values()) {
for (final BlockNodeMetricTypes.Counter counter : BlockNodeMetricTypes.Counter.values()) {
counters.put(
counter,
metrics.getOrCreate(
new Counter.Config(CATEGORY, counter.grafanaLabel())
.withDescription(counter.description())));
metrics.getOrCreate(new Counter.Config(CATEGORY, counter.grafanaLabel())
.withDescription(counter.description())));
}

// Initialize the gauges
for (BlockNodeMetricTypes.Gauge gauge : BlockNodeMetricTypes.Gauge.values()) {
for (final BlockNodeMetricTypes.Gauge gauge : BlockNodeMetricTypes.Gauge.values()) {
gauges.put(
gauge,
metrics.getOrCreate(
new LongGauge.Config(CATEGORY, gauge.grafanaLabel())
.withDescription(gauge.description())));
new LongGauge.Config(CATEGORY, gauge.grafanaLabel()).withDescription(gauge.description())));
}
}

Expand All @@ -72,8 +70,8 @@ public MetricsServiceImpl(@NonNull final Metrics metrics) {
*/
@NonNull
@Override
public Counter get(@NonNull BlockNodeMetricTypes.Counter key) {
return counters.get(key);
public Counter get(@NonNull final BlockNodeMetricTypes.Counter key) {
return counters.get(Objects.requireNonNull(key));
}

/**
Expand All @@ -84,7 +82,7 @@ public Counter get(@NonNull BlockNodeMetricTypes.Counter key) {
*/
@NonNull
@Override
public LongGauge get(@NonNull BlockNodeMetricTypes.Gauge key) {
return gauges.get(key);
public LongGauge get(@NonNull final BlockNodeMetricTypes.Gauge key) {
return gauges.get(Objects.requireNonNull(key));
}
}
Loading
Loading