Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/blockchain/block_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,6 @@ namespace lean::blockchain {
*/
[[nodiscard]] virtual BlockIndex lastFinalized() const = 0;

/**
* Get message for "/leanconsensus/req/status/1/ssz_snappy" protocol.
* Returns hash and slot for finalized and best blocks.
*/
virtual StatusMessage getStatusMessage() const = 0;

/**
* Get `SignedBlock` for "/leanconsensus/req/blocks_by_root/1/ssz_snappy"
* protocol.
Expand Down
10 changes: 0 additions & 10 deletions src/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "types/block.hpp"
#include "types/block_header.hpp"
#include "types/signed_block.hpp"
#include "types/status_message.hpp"

namespace lean::blockchain {
BlockTreeImpl::SafeBlockTreeData::SafeBlockTreeData(BlockTreeData data)
Expand Down Expand Up @@ -794,15 +793,6 @@ namespace lean::blockchain {
[&](const BlockTreeData &p) { return getLastFinalizedNoLock(p); });
}

StatusMessage BlockTreeImpl::getStatusMessage() const {
auto finalized = lastFinalized();
auto head = bestBlock();
return StatusMessage{
.finalized = {.root = finalized.hash, .slot = finalized.slot},
.head = {.root = head.hash, .slot = head.slot},
};
}

outcome::result<std::optional<SignedBlock>> BlockTreeImpl::tryGetSignedBlock(
const BlockHash block_hash) const {
auto header_res = getBlockHeader(block_hash);
Expand Down
1 change: 0 additions & 1 deletion src/blockchain/impl/block_tree_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ namespace lean::blockchain {

BlockIndex lastFinalized() const override;

StatusMessage getStatusMessage() const override;
outcome::result<std::optional<SignedBlock>> tryGetSignedBlock(
const BlockHash block_hash) const override;
void import(std::vector<SignedBlock> blocks) override;
Expand Down
18 changes: 12 additions & 6 deletions src/loaders/impl/networking_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ namespace lean::loaders {

std::shared_ptr<BaseSubscriber<qtils::Empty>> on_loading_finished_;

ON_DISPATCH_SUBSCRIPTION(SendSignedBlock);
ON_DISPATCH_SUBSCRIPTION(SendSignedVote);
SimpleSubscription<messages::SendSignedBlock,
modules::Networking,
&modules::Networking::onDispatchSendSignedBlock>
subscriptionSendSignedBlock;
SimpleSubscription<messages::SendSignedVote,
modules::Networking,
&modules::Networking::onDispatchSendSignedVote>
subscriptionSendSignedVote;

public:
NetworkingLoader(std::shared_ptr<log::LoggingSystem> logsys,
Expand Down Expand Up @@ -89,8 +95,8 @@ namespace lean::loaders {
}
});

ON_DISPATCH_SUBSCRIBE(SendSignedBlock);
ON_DISPATCH_SUBSCRIBE(SendSignedVote);
subscriptionSendSignedBlock.subscribe(*se_manager_, module_internal);
subscriptionSendSignedVote.subscribe(*se_manager_, module_internal);

se_manager_->notify(lean::EventTypes::NetworkingIsLoaded);
}
Expand All @@ -107,7 +113,7 @@ namespace lean::loaders {
se_manager_->notify(lean::EventTypes::PeerDisconnected, msg);
}

void dispatch_StatusMessageReceived(
void dispatchStatusMessageReceived(
std::shared_ptr<const messages::StatusMessageReceived> message)
override {
SL_TRACE(logger_,
Expand All @@ -118,7 +124,7 @@ namespace lean::loaders {
dispatchDerive(*se_manager_, message);
}

void dispatch_SignedVoteReceived(
void dispatchSignedVoteReceived(
std::shared_ptr<const messages::SignedVoteReceived> message) override {
SL_TRACE(logger_, "Dispatch SignedVoteReceived");
dispatchDerive(*se_manager_, message);
Expand Down
2 changes: 1 addition & 1 deletion src/loaders/impl/production_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ namespace lean::loaders {
se_manager_->notify(EventTypes::BlockProduced, msg);
}

void dispatch_SendSignedBlock(
void dispatchSendSignedBlock(
std::shared_ptr<const messages::SendSignedBlock> message) override {
dispatchDerive(*se_manager_, message);
}
Expand Down
4 changes: 3 additions & 1 deletion src/modules/networking/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

add_lean_module(networking
SOURCE
block_request_protocol.cpp
networking.cpp
status_protocol.cpp
INCLUDE_DIRS
${CMAKE_SOURCE_DIR}
${CMAKE_SOURCE_DIR}/src
Expand All @@ -19,4 +21,4 @@ add_lean_module(networking
Snappy::snappy
soralog::soralog
sszpp
)
)
72 changes: 72 additions & 0 deletions src/modules/networking/block_request_protocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#include "modules/networking/block_request_protocol.hpp"

#include <libp2p/basic/read_varint.hpp>
#include <libp2p/basic/write_varint.hpp>
#include <libp2p/coro/spawn.hpp>
#include <libp2p/host/basic_host.hpp>

#include "blockchain/block_tree.hpp"
#include "modules/networking/ssz_snappy.hpp"

namespace lean::modules {
BlockRequestProtocol::BlockRequestProtocol(
std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<libp2p::host::BasicHost> host,
qtils::SharedRef<blockchain::BlockTree> block_tree)
: io_context_{std::move(io_context)},
host_{std::move(host)},
block_tree_{std::move(block_tree)} {}

libp2p::StreamProtocols BlockRequestProtocol::getProtocolIds() const {
return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"};
}

void BlockRequestProtocol::handle(std::shared_ptr<libp2p::Stream> stream) {
libp2p::coroSpawn(
*io_context_,
[self{shared_from_this()}, stream]() -> libp2p::Coro<void> {
std::ignore = co_await self->coroRespond(stream);
});
}

void BlockRequestProtocol::start() {
host_->listenProtocol(shared_from_this());
}

libp2p::CoroOutcome<BlockResponse> BlockRequestProtocol::request(
libp2p::PeerId peer_id, BlockRequest request) {
BOOST_OUTCOME_CO_TRY(auto stream,
co_await host_->newStream(peer_id, getProtocolIds()));
BOOST_OUTCOME_CO_TRY(
co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(request)));
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto response,
decodeSszSnappy<BlockResponse>(encoded));
co_return response;
}

libp2p::CoroOutcome<void> BlockRequestProtocol::coroRespond(
std::shared_ptr<libp2p::Stream> stream) {
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto request, decodeSszSnappy<BlockRequest>(encoded));
BlockResponse response;
for (auto &block_hash : request.blocks) {
BOOST_OUTCOME_CO_TRY(auto block,
block_tree_->tryGetSignedBlock(block_hash));
if (block.has_value()) {
response.blocks.push_back(std::move(block.value()));
}
}
BOOST_OUTCOME_CO_TRY(
co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(response)));
co_return outcome::success();
}
} // namespace lean::modules
52 changes: 52 additions & 0 deletions src/modules/networking/block_request_protocol.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#include <memory>

#include <libp2p/protocol/base_protocol.hpp>
#include <qtils/shared_ref.hpp>

#include "modules/networking/types.hpp"

namespace boost::asio {
class io_context;
} // namespace boost::asio

namespace libp2p::host {
class BasicHost;
} // namespace libp2p::host

namespace lean::blockchain {
class BlockTree;
} // namespace lean::blockchain

namespace lean::modules {
class BlockRequestProtocol
: public std::enable_shared_from_this<BlockRequestProtocol>,
public libp2p::protocol::BaseProtocol {
public:
BlockRequestProtocol(std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<libp2p::host::BasicHost> host,
qtils::SharedRef<blockchain::BlockTree> block_tree);

// BaseProtocol
libp2p::StreamProtocols getProtocolIds() const override;
void handle(std::shared_ptr<libp2p::Stream> stream) override;

void start();

libp2p::CoroOutcome<BlockResponse> request(libp2p::PeerId peer_id,
BlockRequest request);

private:
libp2p::CoroOutcome<void> coroRespond(
std::shared_ptr<libp2p::Stream> stream);

std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<libp2p::host::BasicHost> host_;
qtils::SharedRef<blockchain::BlockTree> block_tree_;
};
} // namespace lean::modules
10 changes: 4 additions & 6 deletions src/modules/networking/interfaces.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

#include <modules/shared/networking_types.tmp.hpp>

#include "modules/shared/macro.hpp"

namespace lean::modules {

struct NetworkingLoader {
Expand All @@ -21,9 +19,9 @@ namespace lean::modules {
virtual void dispatch_peer_disconnected(
std::shared_ptr<const messages::PeerDisconnectedMessage> msg) = 0;

virtual void dispatch_StatusMessageReceived(
virtual void dispatchStatusMessageReceived(
std::shared_ptr<const messages::StatusMessageReceived> message) = 0;
virtual void dispatch_SignedVoteReceived(
virtual void dispatchSignedVoteReceived(
std::shared_ptr<const messages::SignedVoteReceived> message) = 0;
};

Expand All @@ -34,9 +32,9 @@ namespace lean::modules {

virtual void on_loading_is_finished() = 0;

virtual void on_dispatch_SendSignedBlock(
virtual void onDispatchSendSignedBlock(
std::shared_ptr<const messages::SendSignedBlock> message) = 0;
virtual void on_dispatch_SendSignedVote(
virtual void onDispatchSendSignedVote(
std::shared_ptr<const messages::SendSignedVote> message) = 0;
};

Expand Down
Loading
Loading