diff --git a/DEPRECATED.md b/DEPRECATED.md new file mode 100644 index 0000000..0615c67 --- /dev/null +++ b/DEPRECATED.md @@ -0,0 +1,80 @@ +Module system uses subscriptions and messages.\ +For example: + +```c++ +struct Message { ... }; + +struct IModule { + // Send `Message` to subscription + virtual void dispatchMessage(std::shared_ptr message) = 0; + + // Received `Message` from subscription + virtual void onMessage(std::shared_ptr message) = 0; +}; +struct Module : IModule { + SimpleSubscription subscription_message_; + + void start() { + subscription_message_.subscribe(*se_manager_, module_internal); + } + + void dispatchMessage(std::shared_ptr message) override { + log("dispatch Message"); + dispatchDerive(subscription, message); + } + + void onMessage(std::shared_ptr message) override; +}; +void Module::onMessage(std::shared_ptr message) { + log("received Message"); +} +``` + +That code may look like boilerplate and duplicate message type name,\ +but don't try to simplify it using macro.\ +The following code is forbidden and will not pass PR review. + +```c++ +struct Message { ... }; + +struct IModule { + // Send `Message` to subscription + VIRTUAL_DISPATCH(Message); + + // Received `Message` from subscription + VIRTUAL_ON_DISPATCH(Message); +}; +struct Module : IModule { + ON_DISPATCH_SUBSCRIPTION(Message); + + void start() { + ON_DISPATCH_SUBSCRIBE(Message); + } + + DISPATCH_OVERRIDE(Message) { + log("dispatch Message"); + dispatchDerive(subscription, message); + } + + ON_DISPATCH_OVERRIDE(Message); +}; +ON_DISPATCH_IMPL(Module, Message) { + log("received Message"); +} +``` + +Don't try to preserve type name case in variable or member name.\ +Yes, it breaks relation between type name and field name.\ +Yes, you need to change case manually.\ +The following code is forbidden and will not pass PR review. + +```c++ +SimpleSubscription subscription_Message_; +``` + +Don't try to separate type name in function name.\ +The following code is forbidden and will not pass PR review. + +```c++ +void dispatch_Message(std::shared_ptr message); +``` diff --git a/src/blockchain/block_tree.hpp b/src/blockchain/block_tree.hpp index 4be1b9e..fcb912b 100644 --- a/src/blockchain/block_tree.hpp +++ b/src/blockchain/block_tree.hpp @@ -153,12 +153,6 @@ namespace lean::blockchain { */ [[nodiscard]] virtual BlockIndex lastFinalized() const = 0; - /** - * Get message for "/leanconsensus/req/status/1/ssz_snappy" protocol. - * Returns hash and slot for finalized and best blocks. - */ - virtual StatusMessage getStatusMessage() const = 0; - /** * Get `SignedBlock` for "/leanconsensus/req/blocks_by_root/1/ssz_snappy" * protocol. diff --git a/src/blockchain/impl/block_tree_impl.cpp b/src/blockchain/impl/block_tree_impl.cpp index 2b7b963..dd9c0ad 100644 --- a/src/blockchain/impl/block_tree_impl.cpp +++ b/src/blockchain/impl/block_tree_impl.cpp @@ -22,7 +22,6 @@ #include "types/block.hpp" #include "types/block_header.hpp" #include "types/signed_block.hpp" -#include "types/status_message.hpp" namespace lean::blockchain { BlockTreeImpl::SafeBlockTreeData::SafeBlockTreeData(BlockTreeData data) @@ -787,15 +786,6 @@ namespace lean::blockchain { [&](const BlockTreeData &p) { return getLastFinalizedNoLock(p); }); } - StatusMessage BlockTreeImpl::getStatusMessage() const { - auto finalized = lastFinalized(); - auto head = bestBlock(); - return StatusMessage{ - .finalized = {.root = finalized.hash, .slot = finalized.slot}, - .head = {.root = head.hash, .slot = head.slot}, - }; - } - outcome::result> BlockTreeImpl::tryGetSignedBlock( const BlockHash block_hash) const { auto header_res = getBlockHeader(block_hash); diff --git a/src/blockchain/impl/block_tree_impl.hpp b/src/blockchain/impl/block_tree_impl.hpp index 9603363..d9140c7 100644 --- a/src/blockchain/impl/block_tree_impl.hpp +++ b/src/blockchain/impl/block_tree_impl.hpp @@ -108,7 +108,6 @@ namespace lean::blockchain { BlockIndex lastFinalized() const override; - StatusMessage getStatusMessage() const override; outcome::result> tryGetSignedBlock( const BlockHash block_hash) const override; void import(std::vector blocks) override; diff --git a/src/loaders/impl/networking_loader.hpp b/src/loaders/impl/networking_loader.hpp index d452b7d..8f70501 100644 --- a/src/loaders/impl/networking_loader.hpp +++ b/src/loaders/impl/networking_loader.hpp @@ -34,8 +34,14 @@ namespace lean::loaders { std::shared_ptr> on_loading_finished_; - ON_DISPATCH_SUBSCRIPTION(SendSignedBlock); - ON_DISPATCH_SUBSCRIPTION(SendSignedVote); + SimpleSubscription + subscription_send_signed_block_; + SimpleSubscription + subscription_send_signed_vote_; public: NetworkingLoader(std::shared_ptr logsys, @@ -89,8 +95,8 @@ namespace lean::loaders { } }); - ON_DISPATCH_SUBSCRIBE(SendSignedBlock); - ON_DISPATCH_SUBSCRIBE(SendSignedVote); + subscription_send_signed_block_.subscribe(*se_manager_, module_internal); + subscription_send_signed_vote_.subscribe(*se_manager_, module_internal); se_manager_->notify(lean::EventTypes::NetworkingIsLoaded); } @@ -107,7 +113,7 @@ namespace lean::loaders { se_manager_->notify(lean::EventTypes::PeerDisconnected, msg); } - void dispatch_StatusMessageReceived( + void dispatchStatusMessageReceived( std::shared_ptr message) override { SL_TRACE(logger_, @@ -118,7 +124,7 @@ namespace lean::loaders { dispatchDerive(*se_manager_, message); } - void dispatch_SignedVoteReceived( + void dispatchSignedVoteReceived( std::shared_ptr message) override { SL_TRACE(logger_, "Dispatch SignedVoteReceived"); dispatchDerive(*se_manager_, message); diff --git a/src/loaders/impl/production_loader.hpp b/src/loaders/impl/production_loader.hpp index be6dfdd..5880381 100644 --- a/src/loaders/impl/production_loader.hpp +++ b/src/loaders/impl/production_loader.hpp @@ -132,7 +132,7 @@ namespace lean::loaders { se_manager_->notify(EventTypes::BlockProduced, msg); } - void dispatch_SendSignedBlock( + void dispatchSendSignedBlock( std::shared_ptr message) override { dispatchDerive(*se_manager_, message); } diff --git a/src/modules/networking/CMakeLists.txt b/src/modules/networking/CMakeLists.txt index 155bb80..eac0ea5 100644 --- a/src/modules/networking/CMakeLists.txt +++ b/src/modules/networking/CMakeLists.txt @@ -6,7 +6,9 @@ add_lean_module(networking SOURCE + block_request_protocol.cpp networking.cpp + status_protocol.cpp INCLUDE_DIRS ${CMAKE_SOURCE_DIR} ${CMAKE_SOURCE_DIR}/src @@ -19,4 +21,4 @@ add_lean_module(networking Snappy::snappy soralog::soralog sszpp -) \ No newline at end of file +) diff --git a/src/modules/networking/block_request_protocol.cpp b/src/modules/networking/block_request_protocol.cpp new file mode 100644 index 0000000..116217f --- /dev/null +++ b/src/modules/networking/block_request_protocol.cpp @@ -0,0 +1,72 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/networking/block_request_protocol.hpp" + +#include +#include +#include +#include + +#include "blockchain/block_tree.hpp" +#include "modules/networking/ssz_snappy.hpp" + +namespace lean::modules { + BlockRequestProtocol::BlockRequestProtocol( + std::shared_ptr io_context, + std::shared_ptr host, + qtils::SharedRef block_tree) + : io_context_{std::move(io_context)}, + host_{std::move(host)}, + block_tree_{std::move(block_tree)} {} + + libp2p::StreamProtocols BlockRequestProtocol::getProtocolIds() const { + return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"}; + } + + void BlockRequestProtocol::handle(std::shared_ptr stream) { + libp2p::coroSpawn( + *io_context_, + [self{shared_from_this()}, stream]() -> libp2p::Coro { + std::ignore = co_await self->coroRespond(stream); + }); + } + + void BlockRequestProtocol::start() { + host_->listenProtocol(shared_from_this()); + } + + libp2p::CoroOutcome BlockRequestProtocol::request( + libp2p::PeerId peer_id, BlockRequest request) { + BOOST_OUTCOME_CO_TRY(auto stream, + co_await host_->newStream(peer_id, getProtocolIds())); + BOOST_OUTCOME_CO_TRY( + co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(request))); + qtils::ByteVec encoded; + BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY(auto response, + decodeSszSnappy(encoded)); + co_return response; + } + + libp2p::CoroOutcome BlockRequestProtocol::coroRespond( + std::shared_ptr stream) { + qtils::ByteVec encoded; + BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY(auto request, decodeSszSnappy(encoded)); + BlockResponse response; + for (auto &block_hash : request.blocks) { + BOOST_OUTCOME_CO_TRY(auto block, + block_tree_->tryGetSignedBlock(block_hash)); + if (block.has_value()) { + response.blocks.push_back(std::move(block.value())); + } + } + BOOST_OUTCOME_CO_TRY( + co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(response))); + co_return outcome::success(); + } +} // namespace lean::modules diff --git a/src/modules/networking/block_request_protocol.hpp b/src/modules/networking/block_request_protocol.hpp new file mode 100644 index 0000000..09798e1 --- /dev/null +++ b/src/modules/networking/block_request_protocol.hpp @@ -0,0 +1,54 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include + +#include "modules/networking/types.hpp" + +namespace boost::asio { + class io_context; +} // namespace boost::asio + +namespace libp2p::host { + class BasicHost; +} // namespace libp2p::host + +namespace lean::blockchain { + class BlockTree; +} // namespace lean::blockchain + +namespace lean::modules { + class BlockRequestProtocol + : public std::enable_shared_from_this, + public libp2p::protocol::BaseProtocol { + public: + BlockRequestProtocol(std::shared_ptr io_context, + std::shared_ptr host, + qtils::SharedRef block_tree); + + // BaseProtocol + libp2p::StreamProtocols getProtocolIds() const override; + void handle(std::shared_ptr stream) override; + + void start(); + + libp2p::CoroOutcome request(libp2p::PeerId peer_id, + BlockRequest request); + + private: + libp2p::CoroOutcome coroRespond( + std::shared_ptr stream); + + std::shared_ptr io_context_; + std::shared_ptr host_; + qtils::SharedRef block_tree_; + }; +} // namespace lean::modules diff --git a/src/modules/networking/interfaces.hpp b/src/modules/networking/interfaces.hpp index 02c3607..8d15fdf 100644 --- a/src/modules/networking/interfaces.hpp +++ b/src/modules/networking/interfaces.hpp @@ -8,8 +8,6 @@ #include -#include "modules/shared/macro.hpp" - namespace lean::modules { struct NetworkingLoader { @@ -21,9 +19,9 @@ namespace lean::modules { virtual void dispatch_peer_disconnected( std::shared_ptr msg) = 0; - virtual void dispatch_StatusMessageReceived( + virtual void dispatchStatusMessageReceived( std::shared_ptr message) = 0; - virtual void dispatch_SignedVoteReceived( + virtual void dispatchSignedVoteReceived( std::shared_ptr message) = 0; }; @@ -34,9 +32,9 @@ namespace lean::modules { virtual void on_loading_is_finished() = 0; - virtual void on_dispatch_SendSignedBlock( + virtual void onSendSignedBlock( std::shared_ptr message) = 0; - virtual void on_dispatch_SendSignedVote( + virtual void onSendSignedVote( std::shared_ptr message) = 0; }; diff --git a/src/modules/networking/networking.cpp b/src/modules/networking/networking.cpp index 4af2856..b3bb831 100644 --- a/src/modules/networking/networking.cpp +++ b/src/modules/networking/networking.cpp @@ -19,9 +19,10 @@ #include #include "blockchain/block_tree.hpp" -#include "serde/serialization.hpp" -#include "serde/snappy.hpp" -#include "types/block_request.hpp" +#include "modules/networking/block_request_protocol.hpp" +#include "modules/networking/ssz_snappy.hpp" +#include "modules/networking/status_protocol.hpp" +#include "modules/networking/types.hpp" #include "utils/__debug_env.hpp" #include "utils/sample_peer.hpp" @@ -32,16 +33,6 @@ namespace lean::modules { return std::format("/leanconsensus/devnet0/{}/ssz_snappy", type); } - auto encodeSszSnappy(const auto &t) { - return snappyCompress(encode(t).value()); - } - - template - outcome::result decodeSszSnappy(qtils::BytesIn compressed) { - BOOST_OUTCOME_TRY(auto uncompressed, snappyUncompress(compressed)); - return decode(uncompressed); - } - libp2p::protocol::gossip::MessageId gossipMessageId( const libp2p::protocol::gossip::Message &message) { constexpr qtils::ByteArr<4> MESSAGE_DOMAIN_INVALID_SNAPPY{0, 0, 0, 0}; @@ -68,133 +59,6 @@ namespace lean::modules { return hash; } - class StatusProtocol : public std::enable_shared_from_this, - public libp2p::protocol::BaseProtocol { - public: - using GetStatus = std::function; - using OnStatus = std::function; - - StatusProtocol(std::shared_ptr io_context, - std::shared_ptr host, - GetStatus get_status, - OnStatus on_status) - : io_context_{std::move(io_context)}, - host_{std::move(host)}, - get_status_{std::move(get_status)}, - on_status_{std::move(on_status)} {} - - // BaseProtocol - libp2p::StreamProtocols getProtocolIds() const override { - return {"/leanconsensus/req/status/1/ssz_snappy"}; - } - void handle(std::shared_ptr stream) override { - libp2p::coroSpawn( - *io_context_, - [self{shared_from_this()}, stream]() -> libp2p::Coro { - std::ignore = co_await self->coroHandle(stream); - }); - } - - void start() { - host_->listenProtocol(shared_from_this()); - } - - libp2p::CoroOutcome connect( - std::shared_ptr connection) { - BOOST_OUTCOME_CO_TRY( - auto stream, co_await host_->newStream(connection, getProtocolIds())); - BOOST_OUTCOME_CO_TRY(co_await coroHandle(stream)); - co_return outcome::success(); - } - - private: - libp2p::CoroOutcome coroHandle( - std::shared_ptr stream) { - auto peer_id = stream->remotePeerId(); - BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( - stream, encodeSszSnappy(get_status_()))); - qtils::ByteVec encoded; - BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); - BOOST_OUTCOME_CO_TRY(auto status, - decodeSszSnappy(encoded)); - on_status_(messages::StatusMessageReceived{ - .from_peer = peer_id, - .notification = status, - }); - co_return outcome::success(); - } - - std::shared_ptr io_context_; - std::shared_ptr host_; - GetStatus get_status_; - OnStatus on_status_; - }; - - class BlockRequestProtocol - : public std::enable_shared_from_this, - public libp2p::protocol::BaseProtocol { - public: - BlockRequestProtocol(std::shared_ptr io_context, - std::shared_ptr host, - qtils::SharedRef block_tree) - : io_context_{std::move(io_context)}, - host_{std::move(host)}, - block_tree_{std::move(block_tree)} {} - - // BaseProtocol - libp2p::StreamProtocols getProtocolIds() const override { - return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"}; - } - void handle(std::shared_ptr stream) override { - libp2p::coroSpawn( - *io_context_, - [self{shared_from_this()}, stream]() -> libp2p::Coro { - std::ignore = co_await self->coroRespond(stream); - }); - } - - void start() { - host_->listenProtocol(shared_from_this()); - } - - libp2p::CoroOutcome request(libp2p::PeerId peer_id, - BlockRequest request) { - BOOST_OUTCOME_CO_TRY( - auto stream, co_await host_->newStream(peer_id, getProtocolIds())); - BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( - stream, encodeSszSnappy(request))); - qtils::ByteVec encoded; - BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); - BOOST_OUTCOME_CO_TRY(auto response, - decodeSszSnappy(encoded)); - co_return response; - } - - private: - libp2p::CoroOutcome coroRespond( - std::shared_ptr stream) { - qtils::ByteVec encoded; - BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); - BOOST_OUTCOME_CO_TRY(auto request, - decodeSszSnappy(encoded)); - BlockResponse response; - for (auto &block_hash : request.blocks) { - BOOST_OUTCOME_CO_TRY(auto block, - block_tree_->tryGetSignedBlock(block_hash)); - if (block.has_value()) { - response.blocks.push_back(std::move(block.value())); - } - } - BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( - stream, encodeSszSnappy(response))); - co_return outcome::success(); - } - - std::shared_ptr io_context_; - std::shared_ptr host_; - qtils::SharedRef block_tree_; - }; - NetworkingImpl::NetworkingImpl( NetworkingLoader &loader, qtils::SharedRef logging_system, @@ -276,7 +140,14 @@ namespace lean::modules { status_protocol_ = std::make_shared( io_context_, host, - [block_tree{block_tree_}]() { return block_tree->getStatusMessage(); }, + [block_tree{block_tree_}]() { + auto finalized = block_tree->lastFinalized(); + auto head = block_tree->bestBlock(); + return StatusMessage{ + .finalized = {.root = finalized.hash, .slot = finalized.slot}, + .head = {.root = head.hash, .slot = head.slot}, + }; + }, [weak_self{weak_from_this()}](messages::StatusMessageReceived message) { auto self = weak_self.lock(); if (not self) { @@ -309,7 +180,7 @@ namespace lean::modules { if (not self) { return; } - self->loader_.dispatch_SignedVoteReceived( + self->loader_.dispatchSignedVoteReceived( std::make_shared(std::move(vote))); }); @@ -343,7 +214,7 @@ namespace lean::modules { SL_INFO(logger_, "Loading is finished"); } - void NetworkingImpl::on_dispatch_SendSignedBlock( + void NetworkingImpl::onSendSignedBlock( std::shared_ptr message) { boost::asio::post(*io_context_, [self{shared_from_this()}, message] { self->gossip_blocks_topic_->publish( @@ -351,7 +222,7 @@ namespace lean::modules { }); } - void NetworkingImpl::on_dispatch_SendSignedVote( + void NetworkingImpl::onSendSignedVote( std::shared_ptr message) { boost::asio::post(*io_context_, [self{shared_from_this()}, message] { self->gossip_votes_topic_->publish( @@ -397,7 +268,7 @@ namespace lean::modules { SL_TRACE(logger_, "receiveStatus {} => request", head.slot); requestBlock(message.from_peer, head.hash); } - loader_.dispatch_StatusMessageReceived(qtils::toSharedPtr(message)); + loader_.dispatchStatusMessageReceived(qtils::toSharedPtr(message)); } void NetworkingImpl::requestBlock(const libp2p::PeerId &peer_id, diff --git a/src/modules/networking/networking.hpp b/src/modules/networking/networking.hpp index 6d866f1..45ea7bd 100644 --- a/src/modules/networking/networking.hpp +++ b/src/modules/networking/networking.hpp @@ -57,9 +57,9 @@ namespace lean::modules { // Networking void on_loaded_success() override; void on_loading_is_finished() override; - void on_dispatch_SendSignedBlock( + void onSendSignedBlock( std::shared_ptr message) override; - void on_dispatch_SendSignedVote( + void onSendSignedVote( std::shared_ptr message) override; private: diff --git a/src/modules/networking/ssz_snappy.hpp b/src/modules/networking/ssz_snappy.hpp new file mode 100644 index 0000000..910d8da --- /dev/null +++ b/src/modules/networking/ssz_snappy.hpp @@ -0,0 +1,22 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "serde/serialization.hpp" +#include "serde/snappy.hpp" + +namespace lean { + auto encodeSszSnappy(const auto &t) { + return snappyCompress(encode(t).value()); + } + + template + outcome::result decodeSszSnappy(qtils::BytesIn compressed) { + BOOST_OUTCOME_TRY(auto uncompressed, snappyUncompress(compressed)); + return decode(uncompressed); + } +} // namespace lean diff --git a/src/modules/networking/status_protocol.cpp b/src/modules/networking/status_protocol.cpp new file mode 100644 index 0000000..e07466e --- /dev/null +++ b/src/modules/networking/status_protocol.cpp @@ -0,0 +1,65 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "modules/networking/status_protocol.hpp" + +#include +#include +#include +#include + +#include "modules/networking/ssz_snappy.hpp" + +namespace lean::modules { + StatusProtocol::StatusProtocol( + std::shared_ptr io_context, + std::shared_ptr host, + GetStatus get_status, + OnStatus on_status) + : io_context_{std::move(io_context)}, + host_{std::move(host)}, + get_status_{std::move(get_status)}, + on_status_{std::move(on_status)} {} + + libp2p::StreamProtocols StatusProtocol::getProtocolIds() const { + return {"/leanconsensus/req/status/1/ssz_snappy"}; + } + + void StatusProtocol::handle(std::shared_ptr stream) { + libp2p::coroSpawn( + *io_context_, + [self{shared_from_this()}, stream]() -> libp2p::Coro { + std::ignore = co_await self->coroHandle(stream); + }); + } + + void StatusProtocol::start() { + host_->listenProtocol(shared_from_this()); + } + + libp2p::CoroOutcome StatusProtocol::connect( + std::shared_ptr connection) { + BOOST_OUTCOME_CO_TRY( + auto stream, co_await host_->newStream(connection, getProtocolIds())); + BOOST_OUTCOME_CO_TRY(co_await coroHandle(stream)); + co_return outcome::success(); + } + + libp2p::CoroOutcome StatusProtocol::coroHandle( + std::shared_ptr stream) { + auto peer_id = stream->remotePeerId(); + BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage( + stream, encodeSszSnappy(get_status_()))); + qtils::ByteVec encoded; + BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded)); + BOOST_OUTCOME_CO_TRY(auto status, decodeSszSnappy(encoded)); + on_status_(messages::StatusMessageReceived{ + .from_peer = peer_id, + .notification = status, + }); + co_return outcome::success(); + } +} // namespace lean::modules diff --git a/src/modules/networking/status_protocol.hpp b/src/modules/networking/status_protocol.hpp new file mode 100644 index 0000000..d433eef --- /dev/null +++ b/src/modules/networking/status_protocol.hpp @@ -0,0 +1,53 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include + +#include "modules/shared/networking_types.tmp.hpp" + +namespace boost::asio { + class io_context; +} // namespace boost::asio + +namespace libp2p::host { + class BasicHost; +} // namespace libp2p::host + +namespace lean::modules { + class StatusProtocol : public std::enable_shared_from_this, + public libp2p::protocol::BaseProtocol { + public: + using GetStatus = std::function; + using OnStatus = std::function; + + StatusProtocol(std::shared_ptr io_context, + std::shared_ptr host, + GetStatus get_status, + OnStatus on_status); + + // BaseProtocol + libp2p::StreamProtocols getProtocolIds() const override; + void handle(std::shared_ptr stream) override; + + void start(); + + libp2p::CoroOutcome connect( + std::shared_ptr connection); + + private: + libp2p::CoroOutcome coroHandle( + std::shared_ptr stream); + + std::shared_ptr io_context_; + std::shared_ptr host_; + GetStatus get_status_; + OnStatus on_status_; + }; +} // namespace lean::modules diff --git a/src/types/block_request.hpp b/src/modules/networking/types.hpp similarity index 77% rename from src/types/block_request.hpp rename to src/modules/networking/types.hpp index 230f52e..5e9b49a 100644 --- a/src/types/block_request.hpp +++ b/src/modules/networking/types.hpp @@ -9,6 +9,13 @@ #include "types/signed_block.hpp" namespace lean { + struct StatusMessage : ssz::ssz_container { + Checkpoint finalized; + Checkpoint head; + + SSZ_CONT(finalized, head); + }; + struct BlockRequest : ssz::ssz_container { ssz::list blocks; diff --git a/src/modules/production/interfaces.hpp b/src/modules/production/interfaces.hpp index 54ddf63..87b2a71 100644 --- a/src/modules/production/interfaces.hpp +++ b/src/modules/production/interfaces.hpp @@ -8,7 +8,6 @@ #include -#include "modules/shared/macro.hpp" #include "modules/shared/networking_types.tmp.hpp" namespace lean::messages { @@ -27,7 +26,7 @@ namespace lean::modules { virtual void dispatch_block_produced(std::shared_ptr) = 0; - virtual void dispatch_SendSignedBlock( + virtual void dispatchSendSignedBlock( std::shared_ptr message) = 0; }; diff --git a/src/modules/production/production.cpp b/src/modules/production/production.cpp index 6c199d7..5cc67b3 100644 --- a/src/modules/production/production.cpp +++ b/src/modules/production/production.cpp @@ -69,7 +69,7 @@ namespace lean::modules { loader_.dispatch_block_produced(std::make_shared(block)); // TODO(turuslan): signature - loader_.dispatch_SendSignedBlock( + loader_.dispatchSendSignedBlock( std::make_shared( SignedBlock{.message = block})); } diff --git a/src/modules/shared/macro.hpp b/src/modules/shared/macro.hpp deleted file mode 100644 index 3708504..0000000 --- a/src/modules/shared/macro.hpp +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -/** - * Macro to work with subscriptions and messages. - * - * struct Message { ... }; - * - * struct IModule { - * // Send `Message` to subscription - * VIRTUAL_DISPATCH(Message); - * - * // Received `Message` from subscription - * VIRTUAL_ON_DISPATCH(Message); - * }; - * struct Module : IModule { - * ON_DISPATCH_SUBSCRIPTION(Message); - * - * void start() { - * ON_DISPATCH_SUBSCRIBE(Message); - * } - * - * DISPATCH_OVERRIDE(Message) { - * log("dispatch Message"); - * dispatchDerive(subscription, message); - * } - * - * ON_DISPATCH_OVERRIDE(Message); - * }; - * ON_DISPATCH_IMPL(Module, Message) { - * log("received Message"); - * } - */ - -#define VIRTUAL_DISPATCH(T) \ - virtual void dispatch_##T(std::shared_ptr message) = 0 -#define DISPATCH_OVERRIDE(T) \ - void dispatch_##T(std::shared_ptr message) override - -#define VIRTUAL_ON_DISPATCH(T) \ - virtual void on_dispatch_##T(std::shared_ptr message) = 0 -#define ON_DISPATCH_OVERRIDE(T) \ - void on_dispatch_##T(std::shared_ptr message) override -#define ON_DISPATCH_IMPL(C, T) \ - void C::on_dispatch_##T(std::shared_ptr message) - - -#define ON_DISPATCH_SUBSCRIPTION(T) \ - std::shared_ptr< \ - BaseSubscriber>> \ - on_dispatch_subscription_##T -#define ON_DISPATCH_SUBSCRIBE(T) \ - on_dispatch_subscription_##T = \ - se::SubscriberCreator>:: \ - create(*se_manager_, \ - SubscriptionEngineHandlers::kTest, \ - DeriveEventType::get(), \ - [module_internal]( \ - std::nullptr_t, \ - const std::shared_ptr &msg) { \ - if (auto m = module_internal.lock()) { \ - m->on_dispatch_##T(msg); \ - } \ - }) diff --git a/src/modules/shared/networking_types.tmp.hpp b/src/modules/shared/networking_types.tmp.hpp index ccdc9be..84cfc56 100644 --- a/src/modules/shared/networking_types.tmp.hpp +++ b/src/modules/shared/networking_types.tmp.hpp @@ -8,9 +8,9 @@ #include +#include "modules/networking/types.hpp" #include "types/signed_block.hpp" #include "types/signed_vote.hpp" -#include "types/status_message.hpp" namespace lean::messages { template diff --git a/src/se/subscription.hpp b/src/se/subscription.hpp index 7926faa..2fcf049 100644 --- a/src/se/subscription.hpp +++ b/src/se/subscription.hpp @@ -107,3 +107,31 @@ namespace lean::se { } }; } // namespace lean::se + +namespace lean { + template )> + class SimpleSubscription { + public: + void subscribe(auto &subscription, std::weak_ptr weak_module) { + subscription_ = + se::SubscriberCreator>:: + create( + subscription, + SubscriptionEngineHandlers::kTest, + DeriveEventType::get(), + [weak_module](qtils::Empty, + const std::shared_ptr &message) { + if (auto module = weak_module.lock()) { + ((*module).*on_dispatch)(message); + } + }); + } + + private: + std::shared_ptr< + BaseSubscriber>> + subscription_; + }; +} // namespace lean diff --git a/src/serde/snappy.hpp b/src/serde/snappy.hpp index 5553a65..e02526c 100644 --- a/src/serde/snappy.hpp +++ b/src/serde/snappy.hpp @@ -27,14 +27,14 @@ namespace lean { abort(); } - qtils::ByteVec snappyCompress(qtils::BytesIn input) { + inline qtils::ByteVec snappyCompress(qtils::BytesIn input) { std::string compressed; snappy::Compress(qtils::byte2str(input.data()), input.size(), &compressed); return qtils::ByteVec{qtils::str2byte(std::as_const(compressed))}; } - outcome::result snappyUncompress(qtils::BytesIn compressed, - size_t max_size = 4 << 20) { + inline outcome::result snappyUncompress( + qtils::BytesIn compressed, size_t max_size = 4 << 20) { size_t size = 0; if (not snappy::GetUncompressedLength( qtils::byte2str(compressed.data()), compressed.size(), &size)) { diff --git a/src/types/status_message.hpp b/src/types/status_message.hpp deleted file mode 100644 index e06fd15..0000000 --- a/src/types/status_message.hpp +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include "types/checkpoint.hpp" - -namespace lean { - // https://github.com/leanEthereum/leanSpec/blob/main/docs/client/networking.md#status-v1 - struct StatusMessage : ssz::ssz_container { - Checkpoint finalized; - Checkpoint head; - - SSZ_CONT(finalized, head); - }; -} // namespace lean diff --git a/vcpkg-overlay/qtils/portfile.cmake b/vcpkg-overlay/qtils/portfile.cmake index 5e21bab..77ded8d 100644 --- a/vcpkg-overlay/qtils/portfile.cmake +++ b/vcpkg-overlay/qtils/portfile.cmake @@ -2,8 +2,8 @@ vcpkg_check_linkage(ONLY_STATIC_LIBRARY) vcpkg_from_github( OUT_SOURCE_PATH SOURCE_PATH REPO qdrvm/qtils - REF 4eb3f8024817d66932cec0c52e74e127c137a78a - SHA512 c02b90803a1cbf09dcb0e4707c84b3afdc83449d12ad1771e2918a3cdb40b8d01bda4f93fcb50491e35593fd060ec53c8a4b0b425dbb3df936a32312e5b99859 + REF refs/tags/v0.1.4 + SHA512 124f3711eb64df3a2e207bff8bf953ccc2dfa838f21da72a1cc77c8aec95def350e70607adf9d8e7123e56d5bffcf830052f607dfa12badc7efe463bd0be747c ) vcpkg_cmake_configure(SOURCE_PATH "${SOURCE_PATH}") vcpkg_cmake_install() diff --git a/vcpkg-overlay/qtils/vcpkg.patch b/vcpkg-overlay/qtils/vcpkg.patch deleted file mode 100644 index 0c53f04..0000000 --- a/vcpkg-overlay/qtils/vcpkg.patch +++ /dev/null @@ -1,33 +0,0 @@ -diff --git a/src/qtils/hex.hpp b/src/qtils/hex.hpp ---- a/src/qtils/hex.hpp (revision 38edff02e94e1a47b31896607bd4f49b454ee1da) -+++ b/src/qtils/hex.hpp (revision 4749d1f8332e848dbce2ac74fc1a0c1992e9cbdb) -@@ -12,13 +12,16 @@ - #include - - namespace qtils { -+ using BytesIn = std::span; - - /** - * @struct Hex - * @brief Wrapper for hex encoding of byte sequences. - */ -- struct Hex : std::span { -- using std::span::span; -+ struct Hex : BytesIn { -+ using BytesIn::span; -+ -+ Hex(BytesIn span) : BytesIn{span} {} - }; - - } // namespace qtils -diff --git a/src/qtils/enum_error_code.hpp b/src/qtils/enum_error_code.hpp ---- a/src/qtils/enum_error_code.hpp (revision 4749d1f8332e848dbce2ac74fc1a0c1992e9cbdb) -+++ b/src/qtils/enum_error_code.hpp (revision 4eb3f8024817d66932cec0c52e74e127c137a78a) -@@ -8,6 +8,7 @@ - - #include - #include -+#include - - namespace qtils { - template