diff --git a/beamsim.ipynb b/beamsim.ipynb index 92c1b82..7ce3223 100644 --- a/beamsim.ipynb +++ b/beamsim.ipynb @@ -348,7 +348,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": null, "id": "7d4de5f9", "metadata": {}, "outputs": [], @@ -363,6 +363,13 @@ "# - ns3-direct: NS-3 simulation with direct peer connections\n", "backend: ns3-direct\n", "\n", + "snark1_pull: true\n", + "snark1_group_once: true\n", + "snark1_half_direct: true\n", + "snark1_pull_early: false\n", + "snark1_global_smart_push: true\n", + "signature_half_direct: 4\n", + "\n", "# Network Topology Configuration. Options: direct, gossip, grid\n", "topology: gossip\n", "\n", @@ -375,21 +382,21 @@ "# Role Assignment Configuration\n", "roles:\n", " # Number of validator groups (affects parallel processing and aggregation)\n", - " group_count: 10\n", + " group_count: 8\n", " \n", " # Number of validators per group (including local aggregators)\n", - " group_validator_count: 64\n", + " group_validator_count: 1024\n", " \n", " # Number of local aggregators per group \n", - " group_local_aggregator_count: 10\n", + " group_local_aggregator_count: 10%\n", "\n", " # Total number of global aggregators (final aggregation layer)\n", - " global_aggregator_count: 1\n", + " global_aggregator_count: 100\n", "\n", "# Gossipsub Network Configuration (only applies when topology=gossip)\n", "gossip:\n", " # Target number of peers in the mesh network for each topic\n", - " mesh_n: 6\n", + " mesh_n: 8\n", " \n", " # Number of non-mesh peers to maintain connections with\n", " non_mesh_n: 4\n", @@ -415,10 +422,10 @@ " aggregation_rate_per_sec: 1000\n", " \n", " # Rate of SNARK proof recursion/aggregation (proofs per second)\n", - " snark_recursion_aggregation_rate_per_sec: 100\n", + " snark_recursion_aggregation_rate_per_sec: 10\n", " \n", " # Time to verify a single post-quantum signature\n", - " pq_signature_verification_time: 3ms\n", + " pq_signature_verification_time: 3us\n", " \n", " # Time to verify a single SNARK proof\n", " snark_proof_verification_time: 10ms\n", @@ -427,6 +434,7 @@ "network:\n", " # Take latencies from shadow atlas file\n", " gml: \"shadow-atlas.bin\"\n", + " max_bitrate: 100Mbps\n", "\"\"\")" ] }, diff --git a/cli.hpp b/cli.hpp index 627213a..3583b40 100644 --- a/cli.hpp +++ b/cli.hpp @@ -581,6 +581,12 @@ struct SimulationConfig { local_aggregation_only, "stop simulation after local aggregator generates snark1", }}; + bool snark1_global_smart_push = false; + Args::FlagBool flag_snark1_global_smart_push{{ + {"--snark1-global-smart-push"}, + snark1_global_smart_push, + "global aggregators push the first snark1 per group and ignore further ones", + }}; std::optional direct_router; std::string gml_path; Args::FlagStr flag_gml_path{{ @@ -626,6 +632,7 @@ struct SimulationConfig { flag_signature_half_direct, flag_snark1_half_direct, flag_local_aggregation_only, + flag_snark1_global_smart_push, flag_gml_path, flag_max_bitrate, flag_report, @@ -688,6 +695,7 @@ struct SimulationConfig { yaml.at({"snark1_pull_early"}).get(snark1_pull_early); yaml.at({"signature_half_direct"}).get(signature_half_direct); yaml.at({"snark1_half_direct"}).get(snark1_half_direct); + yaml.at({"snark1_global_smart_push"}).get(snark1_global_smart_push); yaml.at({"random_seed"}).get(random_seed); diff --git a/main.cpp b/main.cpp index 4f48264..6cda87b 100644 --- a/main.cpp +++ b/main.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -165,6 +166,8 @@ namespace beamsim::example { PeerIndex signature_half_direct; bool snark1_half_direct; bool stop_on_create_snark1; + // New config: smart push at global aggregators + bool snark1_global_smart_push; PeerIndex snark2_received = 0; bool done = false; std::optional> signature_duplicates; @@ -279,6 +282,17 @@ namespace beamsim::example { return false; } if (auto *ihave = std::get_if(&message->variant)) { + if (role() == Role::GlobalAggregator) { + auto source_group = getGroupFromPeerIndices(ihave->peer_indices); + bool duplicate_group = snark1_received_groups_.get(source_group); + std::cout << "[snark1_announce_recv] t=" << ms(simulator_.time()) + << " global_aggregator=" << peer_index_ + << " from_peer=" << from_peer + << " source_group=" << source_group + << " peers=" << ihave->peer_indices.ones() + << " duplicate_group=" << duplicate_group + << std::endl; + } if (pulling_.contains(ihave->peer_indices)) { return true; } @@ -290,12 +304,14 @@ namespace beamsim::example { auto source_group = getGroupFromPeerIndices(ihave->peer_indices); // If the group has already contributed, ignore this ihave - if (snark1_received_groups_.get(source_group)) { + if (snark1_received_groups_.get(source_group) or snark1_received_ihave_groups_.get(source_group)) { report(simulator_, "snark1_ihave_ignored_duplicate_group", source_group); return true; } + // Mark this group as having contributed + snark1_received_ihave_groups_.set(source_group); } auto bits1 = pulling_max_.ones(); @@ -312,6 +328,15 @@ namespace beamsim::example { send(from_peer, std::make_shared( MessageIwantSnark1{std::move(ihave->peer_indices)})); + if (role() == Role::GlobalAggregator) { + auto first_bit = pulling_max_.findOne(0).value_or(0); + auto source_group = shared_state_.roles.group_of_validator.at(first_bit); + std::cout << "[snark1_iwant_send] t=" << ms(simulator_.time()) + << " global_aggregator=" << peer_index_ + << " to_peer=" << from_peer + << " source_group=" << source_group + << " reason=better_bitfield" << std::endl; + } return true; } if (auto *iwant = std::get_if(&message->variant)) { @@ -336,6 +361,13 @@ namespace beamsim::example { assert2(snark1_direct); forward = [this, any_message] { sendSnark1(any_message); }; } + if (role() == Role::GlobalAggregator) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + std::cout << "[snark1_recv] t=" << ms(simulator_.time()) + << " aggregator=" << peer_index_ + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() << std::endl; + } onMessageSnark1(*snark1, std::move(forward)); return true; } @@ -367,6 +399,13 @@ namespace beamsim::example { signatures_seen.ones()); } if (shared_state_.snark1_pull and shared_state_.snark1_pull_early) { + if (role() == Role::LocalAggregator) { + std::cout << "[snark1_announce_send] t=" << ms(simulator_.time()) + << " local_aggregator=" << peer_index_ + << " group=" << group_index_ + << " peers_partial=" << snark1.peer_indices.ones() + << " mode=early" << std::endl; + } sendSnark1( std::make_shared(MessageIhaveSnark1{snark1.peer_indices})); } @@ -377,6 +416,14 @@ namespace beamsim::example { "snark1_sent", group_index_, snark1.peer_indices.ones()); + // Log generation of snark1 at local aggregator + if (role() == Role::LocalAggregator) { + std::cout << "[snark1_generated] t=" << ms(simulator_.time()) + << " local_aggregator=" << peer_index_ + << " group=" << group_index_ + << " peers=" << snark1.peer_indices.ones() + << std::endl; + } if (shared_state_.stop_on_create_snark1) { shared_state_.done = true; simulator_.stop(); @@ -385,6 +432,11 @@ namespace beamsim::example { _onMessageSnark1(snark1); if (shared_state_.snark1_pull) { if (not shared_state_.snark1_pull_early) { + std::cout << "[snark1_announce_send] t=" << ms(simulator_.time()) + << " local_aggregator=" << peer_index_ + << " group=" << group_index_ + << " peers=" << snark1.peer_indices.ones() + << " mode=late" << std::endl; sendSnark1(std::make_shared( MessageIhaveSnark1{snark1.peer_indices})); } @@ -399,7 +451,22 @@ namespace beamsim::example { thread_.run(simulator_, consts().snark_proof_verification_time, [this, message, forward] { - forward(); + // Smart push at global aggregators: forward and process only once per group + if (shared_state_.snark1_global_smart_push + && role() == Role::GlobalAggregator) { + auto source_group = getGroupFromPeerIndices(message.peer_indices); + if (snark1_pushed_groups_.get(source_group)) { + report(simulator_, + "snark1_smart_push_ignored_duplicate_group", + source_group); + std::cerr << "snark1_smart_push_ignored_duplicate_group " << ms(simulator_.time()) << " " << source_group << "\n"; + return; // ignore duplicate for this group + } + snark1_pushed_groups_.set(source_group); + } + if (forward) { + forward(); + } _onMessageSnark1(message); }); } @@ -409,6 +476,9 @@ namespace beamsim::example { if (want) { auto ihave = std::make_shared( MessageIhaveSnark1{message.peer_indices}); + std::cout << "[snark1_announce_send] t=" << ms(simulator_.time()) + << " local_aggregator=" << peer_index_ + << " group=" << group_index_ << std::endl; for (auto &peer_to : want.mapped()) { send(peer_to, ihave); } @@ -450,6 +520,9 @@ namespace beamsim::example { / consts().snark_recursion_aggregation_rate_per_sec), [this, snark2{std::move(snark2)}]() mutable { report(simulator_, "snark2_sent"); + std::cout << "[snark2_generated] t=" << ms(simulator_.time()) + << " global_aggregator=" << peer_index_ + << " peers=" << snark2.peer_indices.ones() << std::endl; if (kStopOnCreateSnark2) { shared_state_.done = true; simulator_.stop(); @@ -574,6 +647,9 @@ namespace beamsim::example { BitSet pulling_max_; // Track which groups have already contributed snark1 (for global aggregators) BitSet snark1_received_groups_; + BitSet snark1_received_ihave_groups_; + // Track which groups we've already pushed snark1 for (smart push at globals) + BitSet snark1_pushed_groups_; Thread thread_; }; @@ -612,6 +688,17 @@ namespace beamsim::example { if (to_peer == peer_index_) { continue; } + if (auto *m = dynamic_cast(any_message.get())) { + if (auto *snark1 = std::get_if(&m->variant)) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + std::cout << "[snark1_forward] t=" << ms(simulator_.time()) + << " from_global=" << peer_index_ + << " to_global=" << to_peer + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() + << std::endl; + } + } send(to_peer, any_message); } } @@ -643,6 +730,17 @@ namespace beamsim::example { } void sendSnark1(MessagePtr message) override { if (role() == Role::LocalAggregator) { + if (auto *m = dynamic_cast(message.get())) { + if (auto *snark1 = std::get_if(&m->variant)) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + std::cout << "[snark1_send] t=" << ms(simulator_.time()) + << " from_local=" << peer_index_ + << " to_global=" << shared_state_.directSnark1(peer_index_) + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() + << std::endl; + } + } send(shared_state_.directSnark1(peer_index_), message); } else { assert2(role() == Role::GlobalAggregator); @@ -650,6 +748,17 @@ namespace beamsim::example { if (to_peer == peer_index_) { continue; } + if (auto *m = dynamic_cast(message.get())) { + if (auto *snark1 = std::get_if(&m->variant)) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + std::cout << "[snark1_forward] t=" << ms(simulator_.time()) + << " from_global=" << peer_index_ + << " to_global=" << to_peer + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() + << std::endl; + } + } send(to_peer, message); } } @@ -721,6 +830,22 @@ namespace beamsim::example { if (snark1HalfDirect(message)) { return; } + if (auto *m = dynamic_cast(message.get())) { + if (auto *snark1 = std::get_if(&m->variant)) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + if (role() == Role::LocalAggregator) { + std::cout << "[snark1_send] t=" << ms(simulator_.time()) + << " from_local=" << peer_index_ + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() << std::endl; + } else if (role() == Role::GlobalAggregator) { + std::cout << "[snark1_forward] t=" << ms(simulator_.time()) + << " from_global=" << peer_index_ + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() << std::endl; + } + } + } gossip_.gossip(topic_snark1, message); } void sendSnark2(MessageSnark2 message) override { @@ -776,6 +901,22 @@ namespace beamsim::example { if (snark1HalfDirect(message)) { return; } + if (auto *m = dynamic_cast(message.get())) { + if (auto *snark1 = std::get_if(&m->variant)) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + if (role() == Role::LocalAggregator) { + std::cout << "[snark1_send] t=" << ms(simulator_.time()) + << " from_local=" << peer_index_ + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() << std::endl; + } else if (role() == Role::GlobalAggregator) { + std::cout << "[snark1_forward] t=" << ms(simulator_.time()) + << " from_global=" << peer_index_ + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() << std::endl; + } + } + } publish(topic_snark1, message); } void sendSnark2(MessageSnark2 message) override { @@ -796,6 +937,17 @@ namespace beamsim::example { } auto message2 = std::make_shared(message->message, message->ttl - 1); + if (auto *example = dynamic_cast(message->message.get())) { + if (auto *snark1 = std::get_if(&example->variant)) { + auto source_group = getGroupFromPeerIndices(snark1->peer_indices); + std::cout << "[snark1_forward] t=" << ms(simulator_.time()) + << " from_global=" << peer_index_ + // << " to_global=" << to_peer + << " source_group=" << source_group + << " peers=" << snark1->peer_indices.ones() + << std::endl; + } + } topics_.at(topic_index) .forwardTo(from_peer, peer_index_, [&](PeerIndex to_peer) { send(to_peer, message2); @@ -843,6 +995,7 @@ void run_simulation(const SimulationConfig &config) { .signature_half_direct = config.signature_half_direct, .snark1_half_direct = config.snark1_half_direct, .stop_on_create_snark1 = config.local_aggregation_only, + .snark1_global_smart_push = config.snark1_global_smart_push, }; beamsim::example::report(simulator, @@ -1039,3 +1192,20 @@ int main(int argc, char **argv) { return EXIT_SUCCESS; } + + + + + + + + + + + + + + + + +