Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
919 changes: 735 additions & 184 deletions beamsim-local-aggregation-only.ipynb

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion beamsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def get_snark1_received(items):
return rows2


def get_signature_duplicates(items):
_, _, duplicates, signatures = filter_report(items, "signature-duplicates")[0]
avg_duplicates = duplicates / signatures
return duplicates, avg_duplicates


class Metrics:
def __init__(self, items):
rows = filter_report(items, "metrics")
Expand Down Expand Up @@ -87,7 +93,7 @@ def __init__(self, items):
]

exe = "build/beamsim"
if not os.path.exists(exe): # for docker build
if not os.path.exists(exe): # for docker build
exe = "/usr/local/bin/beamsim"
if not os.path.exists(exe):
raise FileNotFoundError(
Expand Down
75 changes: 74 additions & 1 deletion main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ namespace beamsim::example {
bool stop_on_create_snark1;
PeerIndex snark2_received = 0;
bool done = false;
std::optional<std::pair<uint32_t, uint32_t>> signature_duplicates;

PeerIndex snark1_threshold(const Roles::Group &group) const {
return group.validators.size() * consts().snark1_threshold;
Expand Down Expand Up @@ -242,6 +243,12 @@ namespace beamsim::example {
}
thread_.run(simulator_, consts().signature_time, [this] {
MessageSignature signature{peer_index_};

// For idontwant mode, attach our current signature knowledge
if (shared_state_.gossip_config.idontwant) {
signature.seen_signatures = signatures_seen;
}

_onMessageSignature(signature);
sendSignature(std::make_shared<Message>(std::move(signature)));
});
Expand Down Expand Up @@ -347,6 +354,13 @@ namespace beamsim::example {
if (not aggregating_snark1.has_value()) {
return;
}

// For gossip topology with idontwant, track signature for optimization
if (shared_state_.gossip_config.idontwant) {
// Update our knowledge of signatures we've seen
signatures_seen.set(message.peer_index);
}

aggregating_snark1->peer_indices.set(message.peer_index);
PeerIndex threshold = shared_state_.snark1_threshold(group_);
auto received = aggregating_snark1->peer_indices.ones();
Expand All @@ -355,6 +369,10 @@ namespace beamsim::example {
}
auto snark1 = std::move(aggregating_snark1.value());
aggregating_snark1.reset();
if (not shared_state_.signature_duplicates) {
shared_state_.signature_duplicates.emplace(signature_duplicates,
signatures_seen.ones());
}
thread_.run(simulator_,
timeSeconds(received / consts().aggregation_rate_per_sec),
[this, snark1{std::move(snark1)}]() mutable {
Expand Down Expand Up @@ -509,11 +527,42 @@ namespace beamsim::example {
};
}

void checkSignatureDuplicates(const MessagePtr &any_message) {
if (not aggregating_snark1.has_value()) {
return;
}
if (auto *message = dynamic_cast<Message *>(any_message.get())) {
if (auto *signature =
std::get_if<MessageSignature>(&message->variant)) {
if (signatures_seen.get(signature->peer_index)) {
++signature_duplicates;
} else {
signatures_seen.set(signature->peer_index);
assert2(signatures_seen.get(signature->peer_index));
}
}
return;
}
if (auto *message = dynamic_cast<gossip::Message *>(any_message.get())) {
for (auto &publish : message->publish) {
checkSignatureDuplicates(publish.message);
}
return;
}
if (auto *message = dynamic_cast<grid::Message *>(any_message.get())) {
checkSignatureDuplicates(message->message);
return;
}
abort();
}

SharedState &shared_state_;
GroupIndex group_index_;
const Roles::Group &group_;
bool snark2_received = false;
std::optional<MessageSnark1> aggregating_snark1;
BitSet signatures_seen;
uint32_t signature_duplicates = 0;
// TODO: remove when aggregating multiple times
size_t snark1_received = 0;
std::optional<MessageSnark2> aggregating_snark2;
Expand Down Expand Up @@ -551,6 +600,7 @@ namespace beamsim::example {

// IPeer
void onMessage(PeerIndex from_peer, MessagePtr any_message) override {
checkSignatureDuplicates(any_message);
auto forward_snark1 = [this, from_peer, any_message] {
// global aggregator forwards snark1 from local aggregator to global aggregators
if (shared_state_.roles.roles.at(from_peer) == Role::LocalAggregator
Expand Down Expand Up @@ -621,14 +671,16 @@ namespace beamsim::example {
PeerIndex index,
SharedState &shared_state,
Random &random)
: PeerBase{simulator, index, shared_state}, gossip_{*this, random,shared_state_.gossip_config} {}
: PeerBase{simulator, index, shared_state},
gossip_{*this, random, shared_state_.gossip_config} {}

// IPeer
void onStart() override {
PeerBase::onStart();
gossip_.start();
}
void onMessage(PeerIndex from_peer, MessagePtr any_message) override {
checkSignatureDuplicates(any_message);
if (onMessagePull(from_peer, any_message, nullptr)) {
return;
}
Expand All @@ -639,6 +691,10 @@ namespace beamsim::example {
auto &message = dynamic_cast<Message &>(*any_message);
if (auto *signature =
std::get_if<MessageSignature>(&message.variant)) {
// For idontwant mode, update our signature tracking
if (shared_state_.gossip_config.idontwant) {
gossip_.updateOwnSignature(signature->peer_index);
}
onMessageSignature(*signature, std::move(forward));
} else if (std::holds_alternative<MessageIhaveSnark1>(
message.variant)) {
Expand All @@ -658,6 +714,16 @@ namespace beamsim::example {
if (signatureHalfDirect(message) or signatureDirect(message)) {
return;
}

// For idontwant mode, update our signature tracking before sending
if (shared_state_.gossip_config.idontwant) {
if (auto example_msg = dynamic_cast<Message*>(message.get())) {
if (auto *signature = std::get_if<MessageSignature>(&example_msg->variant)) {
gossip_.updateOwnSignature(signature->peer_index);
}
}
}

gossip_.gossip(topicSignature(shared_state_.roles.group_of_validator.at(
peer_index_)),
std::move(message));
Expand All @@ -681,6 +747,7 @@ namespace beamsim::example {

// IPeer
void onMessage(PeerIndex from_peer, MessagePtr any_message) override {
checkSignatureDuplicates(any_message);
if (onMessagePull(from_peer, any_message, nullptr)) {
return;
}
Expand Down Expand Up @@ -899,6 +966,12 @@ void run_simulation(const SimulationConfig &config) {
done ? "SUCCESS" : "FAILURE");
}
metrics.end(simulator_time);
if (shared_state.signature_duplicates) {
auto [signature_duplicates, signatures] =
shared_state.signature_duplicates.value();
beamsim::example::report(
simulator, "signature-duplicates", signature_duplicates, signatures);
}
beamsim::example::report_flush();
};

Expand Down
40 changes: 36 additions & 4 deletions src/beamsim/example/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <beamsim/peer_index.hpp>
#include <beamsim/std_hash.hpp>
#include <bit>
#include <optional>
#include <variant>
#include <vector>

Expand All @@ -17,12 +18,16 @@ namespace beamsim::example {
decodeFrom(from, v.limbs_);
}

static auto split(PeerIndex i) {
return std::make_pair(i / limb_bits, Limb{1} << (i % limb_bits));
}

void set(PeerIndex i) {
size_t i1 = i / limb_bits, i2 = i % limb_bits;
auto [i1, mask] = split(i);
if (limbs_.size() <= i1) {
limbs_.resize(i1 + 1);
}
limbs_[i1] |= Limb{1} << i2;
limbs_.at(i1) |= mask;
}
void set(const BitSet &other) {
if (limbs_.size() < other.limbs_.size()) {
Expand All @@ -34,8 +39,8 @@ namespace beamsim::example {
}

bool get(PeerIndex i) const {
size_t i1 = i / limb_bits, i2 = i % limb_bits;
return i1 < limbs_.size() and (limbs_[i1] >> i2) == 1;
auto [i1, mask] = split(i);
return i1 < limbs_.size() and (limbs_.at(i1) & mask) != 0;
}

std::optional<PeerIndex> findOne(PeerIndex begin) const {
Expand Down Expand Up @@ -67,8 +72,33 @@ namespace beamsim::example {
std::vector<Limb> limbs_;
};

// Forward declarations for template specializations
void encodeTo(beamsim::MessageEncodeTo &to, const std::optional<BitSet> &v);
void decodeFrom(beamsim::MessageDecodeFrom &from, std::optional<BitSet> &v);

// Implementations
inline void encodeTo(beamsim::MessageEncodeTo &to, const std::optional<BitSet> &v) {
beamsim::encodeTo(to, v.has_value());
if (v.has_value()) {
encodeTo(to, v.value());
}
}

inline void decodeFrom(beamsim::MessageDecodeFrom &from, std::optional<BitSet> &v) {
bool has_value;
beamsim::decodeFrom(from, has_value);
if (has_value) {
BitSet bitset;
decodeFrom(from, bitset);
v = std::move(bitset);
} else {
v = std::nullopt;
}
}

struct MessageSignature {
PeerIndex peer_index;
std::optional<BitSet> seen_signatures; // Bitfield of signatures this peer has seen (for idontwant mode)
};
struct MessageIhaveSnark1 {
BitSet peer_indices;
Expand Down Expand Up @@ -113,6 +143,7 @@ namespace beamsim::example {
encodeTo(to, (uint8_t)variant.index());
if (auto *signature = std::get_if<MessageSignature>(&variant)) {
encodeTo(to, signature->peer_index);
encodeTo(to, signature->seen_signatures);
} else if (auto *snark1 = std::get_if<MessageIhaveSnark1>(&variant)) {
encodeTo(to, snark1->peer_indices);
} else if (auto *snark1 = std::get_if<MessageIwantSnark1>(&variant)) {
Expand All @@ -130,6 +161,7 @@ namespace beamsim::example {
case 0: {
MessageSignature signature;
decodeFrom(from, signature.peer_index);
decodeFrom(from, signature.seen_signatures);
return std::make_shared<Message>(std::move(signature));
}
case 1: {
Expand Down
Loading