Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ find_package(PkgConfig REQUIRED)

find_package(Boost CONFIG REQUIRED COMPONENTS algorithm filesystem outcome program_options property_tree random)
find_package(Boost.DI CONFIG REQUIRED)
find_package(Crc32c CONFIG REQUIRED)
find_package(fmt CONFIG REQUIRED)
find_package(hashtree CONFIG REQUIRED)
find_package(libp2p CONFIG REQUIRED)
Expand Down
2 changes: 1 addition & 1 deletion src/modules/networking/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ add_lean_module(networking
bootnodes
p2p::libp2p
qtils::qtils
Snappy::snappy
snappy
soralog::soralog
sszpp
validator_registry
Expand Down
18 changes: 11 additions & 7 deletions src/modules/networking/block_request_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <libp2p/host/basic_host.hpp>

#include "blockchain/block_tree.hpp"
#include "modules/networking/response_status.hpp"
#include "modules/networking/ssz_snappy.hpp"

namespace lean::modules {
Expand All @@ -24,7 +25,7 @@ namespace lean::modules {
block_tree_{std::move(block_tree)} {}

libp2p::StreamProtocols BlockRequestProtocol::getProtocolIds() const {
return {"/leanconsensus/req/blocks_by_root/1/ssz_snappy"};
return {"/leanconsensus/req/lean_blocks_by_root/1/ssz_snappy"};
}

void BlockRequestProtocol::handle(std::shared_ptr<libp2p::Stream> stream) {
Expand All @@ -43,20 +44,22 @@ namespace lean::modules {
libp2p::PeerId peer_id, BlockRequest request) {
BOOST_OUTCOME_CO_TRY(auto stream,
co_await host_->newStream(peer_id, getProtocolIds()));
BOOST_OUTCOME_CO_TRY(
co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(request)));
BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage(
stream, encodeSszSnappyFramed(request)));
BOOST_OUTCOME_CO_TRY(co_await readResponseStatus(stream));
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto response,
decodeSszSnappy<BlockResponse>(encoded));
decodeSszSnappyFramed<BlockResponse>(encoded));
co_return response;
}

libp2p::CoroOutcome<void> BlockRequestProtocol::coroRespond(
std::shared_ptr<libp2p::Stream> stream) {
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto request, decodeSszSnappy<BlockRequest>(encoded));
BOOST_OUTCOME_CO_TRY(auto request,
decodeSszSnappyFramed<BlockRequest>(encoded));
BlockResponse response;
for (auto &block_hash : request.blocks) {
BOOST_OUTCOME_CO_TRY(auto block,
Expand All @@ -65,8 +68,9 @@ namespace lean::modules {
response.blocks.push_back(std::move(block.value()));
}
}
BOOST_OUTCOME_CO_TRY(
co_await libp2p::writeVarintMessage(stream, encodeSszSnappy(response)));
BOOST_OUTCOME_CO_TRY(co_await writeResponseStatus(stream));
BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage(
stream, encodeSszSnappyFramed(response)));
co_return outcome::success();
}
} // namespace lean::modules
2 changes: 1 addition & 1 deletion src/modules/networking/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace lean::modules {
hasher.write(size).value();
hasher.write(message.topic).value();
};
if (auto uncompressed_res = snappyUncompress(message.data)) {
if (auto uncompressed_res = snappy::uncompress(message.data)) {
auto &uncompressed = uncompressed_res.value();
hash_topic();
hasher.write(MESSAGE_DOMAIN_VALID_SNAPPY).value();
Expand Down
27 changes: 27 additions & 0 deletions src/modules/networking/response_status.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <libp2p/basic/read.hpp>
#include <libp2p/basic/write.hpp>
#include <qtils/byte_arr.hpp>

namespace lean {
inline libp2p::CoroOutcome<void> writeResponseStatus(
std::shared_ptr<libp2p::basic::Writer> writer) {
qtils::ByteArr<1> status{0};
BOOST_OUTCOME_CO_TRY(co_await libp2p::write(writer, status));
co_return outcome::success();
}

inline libp2p::CoroOutcome<void> readResponseStatus(
std::shared_ptr<libp2p::basic::Reader> reader) {
qtils::ByteArr<1> status;
BOOST_OUTCOME_CO_TRY(co_await libp2p::read(reader, status));
co_return outcome::success();
}
} // namespace lean
14 changes: 12 additions & 2 deletions src/modules/networking/ssz_snappy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@

namespace lean {
auto encodeSszSnappy(const auto &t) {
return snappyCompress(encode(t).value());
return snappy::compress(encode(t).value());
}

template <typename T>
outcome::result<T> decodeSszSnappy(qtils::BytesIn compressed) {
BOOST_OUTCOME_TRY(auto uncompressed, snappyUncompress(compressed));
BOOST_OUTCOME_TRY(auto uncompressed, snappy::uncompress(compressed));
return decode<T>(uncompressed);
}

auto encodeSszSnappyFramed(const auto &t) {
return snappy::compressFramed(encode(t).value());
}

template <typename T>
outcome::result<T> decodeSszSnappyFramed(qtils::BytesIn compressed) {
BOOST_OUTCOME_TRY(auto uncompressed, snappy::uncompressFramed(compressed));
return decode<T>(uncompressed);
}
} // namespace lean
27 changes: 22 additions & 5 deletions src/modules/networking/status_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <libp2p/coro/spawn.hpp>
#include <libp2p/host/basic_host.hpp>

#include "modules/networking/response_status.hpp"
#include "modules/networking/ssz_snappy.hpp"

namespace lean::modules {
Expand Down Expand Up @@ -44,22 +45,38 @@ namespace lean::modules {
std::shared_ptr<libp2p::connection::CapableConnection> connection) {
BOOST_OUTCOME_CO_TRY(
auto stream, co_await host_->newStream(connection, getProtocolIds()));
BOOST_OUTCOME_CO_TRY(co_await coroHandle(stream));
BOOST_OUTCOME_CO_TRY(co_await write(stream));
BOOST_OUTCOME_CO_TRY(co_await readResponseStatus(stream));
BOOST_OUTCOME_CO_TRY(co_await read(stream));
co_return outcome::success();
}

libp2p::CoroOutcome<void> StatusProtocol::coroHandle(
libp2p::CoroOutcome<void> StatusProtocol::read(
std::shared_ptr<libp2p::Stream> stream) {
auto peer_id = stream->remotePeerId();
BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage(
stream, encodeSszSnappy(get_status_())));
qtils::ByteVec encoded;
BOOST_OUTCOME_CO_TRY(co_await libp2p::readVarintMessage(stream, encoded));
BOOST_OUTCOME_CO_TRY(auto status, decodeSszSnappy<StatusMessage>(encoded));
BOOST_OUTCOME_CO_TRY(auto status,
decodeSszSnappyFramed<StatusMessage>(encoded));
on_status_(messages::StatusMessageReceived{
.from_peer = peer_id,
.notification = status,
});
co_return outcome::success();
}

libp2p::CoroOutcome<void> StatusProtocol::write(
std::shared_ptr<libp2p::Stream> stream) {
BOOST_OUTCOME_CO_TRY(co_await libp2p::writeVarintMessage(
stream, encodeSszSnappyFramed(get_status_())));
co_return outcome::success();
}

libp2p::CoroOutcome<void> StatusProtocol::coroHandle(
std::shared_ptr<libp2p::Stream> stream) {
BOOST_OUTCOME_CO_TRY(co_await read(stream));
BOOST_OUTCOME_CO_TRY(co_await writeResponseStatus(stream));
BOOST_OUTCOME_CO_TRY(co_await write(stream));
co_return outcome::success();
}
} // namespace lean::modules
2 changes: 2 additions & 0 deletions src/modules/networking/status_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace lean::modules {
std::shared_ptr<libp2p::connection::CapableConnection> connection);

private:
libp2p::CoroOutcome<void> read(std::shared_ptr<libp2p::Stream> stream);
libp2p::CoroOutcome<void> write(std::shared_ptr<libp2p::Stream> stream);
libp2p::CoroOutcome<void> coroHandle(
std::shared_ptr<libp2p::Stream> stream);

Expand Down
7 changes: 7 additions & 0 deletions src/serde/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ target_link_libraries(enr
p2p::libp2p
p2p::p2p_key_validator
)

add_library(snappy INTERFACE)
target_link_libraries(snappy INTERFACE
Crc32c::crc32c
qtils::qtils
Snappy::snappy
)
124 changes: 114 additions & 10 deletions src/serde/snappy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@

#include <snappy.h>

#include <boost/endian/conversion.hpp>
#include <crc32c/crc32c.h>
#include <libp2p/common/saturating.hpp>
#include <qtils/bytes.hpp>
#include <qtils/bytestr.hpp>

namespace lean {
namespace lean::snappy {
enum class SnappyError {
UNCOMPRESS_TOO_LONG,
UNCOMPRESS_INVALID,
UNCOMPRESS_TRUNCATED,
UNCOMPRESS_UNKNOWN_IDENTIFIER,
UNCOMPRESS_UNKNOWN_TYPE,
UNCOMPRESS_CRC_MISMATCH,
};
Q_ENUM_ERROR_CODE(SnappyError) {
using E = decltype(e);
Expand All @@ -23,32 +30,129 @@ namespace lean {
return "SnappyError::UNCOMPRESS_TOO_LONG";
case E::UNCOMPRESS_INVALID:
return "SnappyError::UNCOMPRESS_INVALID";
case E::UNCOMPRESS_TRUNCATED:
return "SnappyError::UNCOMPRESS_TRUNCATED";
case E::UNCOMPRESS_UNKNOWN_IDENTIFIER:
return "SnappyError::UNCOMPRESS_UNKNOWN_IDENTIFIER";
case E::UNCOMPRESS_UNKNOWN_TYPE:
return "SnappyError::UNCOMPRESS_UNKNOWN_TYPE";
case E::UNCOMPRESS_CRC_MISMATCH:
return "SnappyError::UNCOMPRESS_CRC_MISMATCH";
}
abort();
}

inline qtils::ByteVec snappyCompress(qtils::BytesIn input) {
constexpr size_t kHeaderSize = 4;
constexpr auto kMaxBlockSize = size_t{1} << 16;
constexpr auto kDefaultMaxSize = size_t{4} << 20;

constexpr qtils::ByteArr<6> kStreamIdentifier{'s', 'N', 'a', 'P', 'p', 'Y'};

enum ChunkType : uint8_t {
Stream = 0xFF,
Compressed = 0x00,
Uncompressed = 0x01,
Padding = 0xFE,
};

inline qtils::ByteVec compress(qtils::BytesIn input) {
std::string compressed;
snappy::Compress(qtils::byte2str(input.data()), input.size(), &compressed);
::snappy::Compress(
qtils::byte2str(input.data()), input.size(), &compressed);
return qtils::ByteVec{qtils::str2byte(std::as_const(compressed))};
}

inline outcome::result<qtils::ByteVec> snappyUncompress(
qtils::BytesIn compressed, size_t max_size = 4 << 20) {
inline outcome::result<qtils::ByteVec> uncompress(
qtils::BytesIn compressed, size_t max_size = kDefaultMaxSize) {
size_t size = 0;
if (not snappy::GetUncompressedLength(
if (not ::snappy::GetUncompressedLength(
qtils::byte2str(compressed.data()), compressed.size(), &size)) {
return SnappyError::UNCOMPRESS_INVALID;
}
if (size > max_size) {
return SnappyError::UNCOMPRESS_TOO_LONG;
}
std::string uncompressed;
if (not snappy::Uncompress(qtils::byte2str(compressed.data()),
compressed.size(),
&uncompressed)) {
if (not ::snappy::Uncompress(qtils::byte2str(compressed.data()),
compressed.size(),
&uncompressed)) {
return SnappyError::UNCOMPRESS_INVALID;
}
return qtils::ByteVec{qtils::str2byte(std::as_const(uncompressed))};
}
} // namespace lean

using Crc32 = qtils::ByteArr<4>;
inline Crc32 hashCrc32(qtils::BytesIn input) {
auto v = crc32c::Crc32c(input.data(), input.size());
v = ((v >> 15) | (v << 17)) + 0xa282ead8;
Crc32 crc;
boost::endian::store_little_u32(crc.data(), v);
return crc;
}

inline qtils::ByteVec compressFramed(qtils::BytesIn input) {
qtils::ByteVec framed;
auto write_header = [&](ChunkType type, size_t size) {
framed.putUint8(type);
qtils::ByteArr<3> size_bytes;
boost::endian::store_little_u24(size_bytes.data(), size);
framed.put(size_bytes);
};
write_header(ChunkType::Stream, kStreamIdentifier.size());
framed.put(kStreamIdentifier);
while (not input.empty()) {
auto chunk = input.first(std::min(input.size(), kMaxBlockSize));
auto crc = hashCrc32(chunk);
input = input.subspan(chunk.size());
auto compressed = compress(chunk);
write_header(ChunkType::Compressed, Crc32::size() + compressed.size());
framed.put(crc);
framed.put(compressed);
}
return framed;
}

inline outcome::result<qtils::ByteVec> uncompressFramed(
qtils::BytesIn compressed, size_t max_size = kDefaultMaxSize) {
qtils::ByteVec result;
while (not compressed.empty()) {
if (compressed.size() < kHeaderSize) {
return SnappyError::UNCOMPRESS_TRUNCATED;
}
auto type = ChunkType{compressed[0]};
auto size = boost::endian::load_little_u24(compressed.data() + 1);
if (compressed.size() < kHeaderSize + size) {
return SnappyError::UNCOMPRESS_TRUNCATED;
}
auto content = compressed.subspan(kHeaderSize, size);
compressed = compressed.subspan(kHeaderSize + size);
if (type == ChunkType::Stream) {
if (qtils::ByteView{content} != kStreamIdentifier) {
return SnappyError::UNCOMPRESS_UNKNOWN_IDENTIFIER;
}
} else if (type == ChunkType::Compressed
or type == ChunkType::Uncompressed) {
qtils::ByteVec buffer;
auto expected_crc = content.first(Crc32::size());
auto uncompressed = content.subspan(Crc32::size());
if (type == ChunkType::Compressed) {
BOOST_OUTCOME_TRY(
buffer,
uncompress(uncompressed,
libp2p::saturating_sub(max_size, result.size())));
uncompressed = buffer;
}
auto actual_crc = hashCrc32(uncompressed);
if (qtils::ByteView{actual_crc} != expected_crc) {
return SnappyError::UNCOMPRESS_CRC_MISMATCH;
}
result.put(uncompressed);
} else if (type == ChunkType::Padding) {
// skip padding
} else {
return SnappyError::UNCOMPRESS_UNKNOWN_TYPE;
}
}
return result;
}
} // namespace lean::snappy
8 changes: 7 additions & 1 deletion tests/unit/serde/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
addtest(enr_test
enr_test.cpp
)

target_link_libraries(enr_test
enr
)

addtest(snappy_test
snappy_test.cpp
)
target_link_libraries(snappy_test
snappy
)

Loading
Loading