-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement consumer handler in the Simulator #360
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Signed-off-by: georgi-l95 <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #360 +/- ##
============================================
+ Coverage 94.62% 97.37% +2.74%
- Complexity 316 360 +44
============================================
Files 69 72 +3
Lines 1228 1333 +105
Branches 84 89 +5
============================================
+ Hits 1162 1298 +136
+ Misses 55 23 -32
- Partials 11 12 +1
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few suggestions
public class BlockStreamSimulatorApp { | ||
|
||
/** Logger for this class */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to javadoc private methods or member variables
this.metricsService = requireNonNull(metricsService); | ||
this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient); | ||
this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient); | ||
this.isRunning = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you moved the assignment from line 46 here. Why move it?
default -> throw new IllegalArgumentException("Unknown SimulatorMode: " + simulatorMode); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// Configuration | ||
private final BlockStreamConfig blockStreamConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockStreamConfig
is never read. Should it be removed?
/** | ||
* Initialize the block stream manager and load blocks into memory. | ||
*/ | ||
void init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change this to a default interface method like: default void init() {}
so you don't have to override with a no-op comment in BlockAsFileLargeDataSets
@Override | ||
public void init() { | ||
// Do nothing, because we don't have real initializing and loading blocks into memory for this implementation. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to remove this init()
method. See my comment in BlockStreamManager
public void completeStreaming() throws InterruptedException { | ||
consumerStreamObserver.onCompleted(); | ||
// todo(352) Find a suitable solution for removing the sleep | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure but I think Netty will continue to send BlockItems to onNext() as long as the channel is open. Right now, I think onCompleted()
will only release the starting thread waiting at line 97. You might need to close the channel in this method and then release the starting thread.
private void processBlockItems(List<BlockItem> blockItems) { | ||
blockItems.stream() | ||
.filter(BlockItem::hasBlockProof) | ||
.forEach(__ -> metricsService.get(LiveBlocksConsumed).increment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you start the consumer up and start streaming blockitems via the producer to the BN, it looks like nothing is happening on the consumer side. Consider changing this to something like:
blockItems.stream()
.filter(BlockItem::hasBlockProof)
.forEach(blockItem -> {
metricsService.get(LiveBlocksConsumed).increment();
LOGGER.log(INFO, "Received block number: " + blockItem.getBlockProof().getBlock());
});
That should print the last block received without overwhelming the terminal. This gives the consumer parity with the producer which prints the response hashes right now.
Description:
This pull request aims to make use of the consumer mode in the simulator, by adding needed implementations.
We add:
SubscribeStreamRequest
andSubscribeStreamResponse
, which are part of thesubscribeBlockStream
rpc.Related issue(s):
Fixes #121
Notes for reviewer:
Checklist