Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
94043bf
Change topics to match starknet
ancazamfir Feb 18, 2025
d2410c5
Change starknet INITIAL to 0
ancazamfir Feb 18, 2025
034e17a
More topic changes
ancazamfir Feb 19, 2025
5fb98a1
Start from INITIAL if nothing in store, else store height + 1
ancazamfir Feb 19, 2025
476fd13
Cannot decrement with INITIAL 0, also matches starknet proposer
ancazamfir Feb 19, 2025
eacfcb0
Fix one test, must disable sync.
ancazamfir Feb 19, 2025
a35e7f0
Merge branch 'main' into anca/starknet_interop
romac Feb 27, 2025
64cb494
Post-merge fixes
romac Feb 27, 2025
3780e00
Improve logs
romac Feb 27, 2025
ffdc15b
Use same protocol versions as Starknet
romac Mar 3, 2025
d8a7ee9
Merge branch 'main' into anca/starknet_interop
romac Mar 13, 2025
7e5e3bd
Bunch of horrible hacks to have the nodes connect
romac Mar 13, 2025
a894356
Switch to Protobuf currently used by the sequencer
romac Mar 13, 2025
1f297db
Fix handling of sha256 topic hashes
romac Mar 17, 2025
5259eab
Fix last compatibility issues
romac Mar 17, 2025
d24f676
Remove dependency on `starknet_api`
romac Mar 17, 2025
754c1df
Merge branch 'main' into anca/starknet_interop
romac Mar 17, 2025
b4e37b9
Revert changes made to discovery
romac Mar 18, 2025
5494c5f
Only enable the network behaviours which are actually required
romac Mar 18, 2025
bd180f5
Fix unit tests
romac Mar 18, 2025
53330ea
Revert back Kad protocol name change
romac Mar 18, 2025
f6eb2fe
fix: dial backoff
bastienfaivre Mar 26, 2025
da7c1cb
feat: attempt for #849
bastienfaivre Mar 26, 2025
de6e18d
fix: channel names not static
bastienfaivre Mar 27, 2025
f07b6b4
feat: initial Starknet interop setup
bastienfaivre Mar 27, 2025
8a0638c
feat: 4-node starknet interop setup
bastienfaivre Mar 28, 2025
7e6b5e4
feat: tc rules
bastienfaivre Mar 29, 2025
89ee3a9
fix: starknet interop setup cleaning and optimization
bastienfaivre Mar 31, 2025
263dd3b
fix: README typos
bastienfaivre Mar 31, 2025
43836bf
fix: sync + README
bastienfaivre Mar 31, 2025
fcc7364
feat(code/starknet): Derive address from public key (#949)
romac Apr 1, 2025
ae15750
fix: setup with new validator IDs handling
bastienfaivre Apr 3, 2025
71df2f5
feat: latency-limit config
bastienfaivre Apr 7, 2025
c13eb10
feat: automated flexible setup
bastienfaivre Apr 9, 2025
43d85ad
fix: typo
bastienfaivre Apr 9, 2025
74355b0
feat: global management of nodes
bastienfaivre Apr 10, 2025
f2edcb8
fix: stop pkill pattern
bastienfaivre Apr 10, 2025
0b98d3c
doc: mention logs location
bastienfaivre Apr 14, 2025
4c25b59
fix: moniker + container names
bastienfaivre Apr 14, 2025
4d55617
doc: explain latency file
bastienfaivre Apr 15, 2025
9fc4995
Make sure the validator set is ordered the same way as in genesis.json
romac May 22, 2025
3704105
qa: Add `stop` command to `manage.sh`
romac May 22, 2025
24727f5
Smaller docker images
romac May 26, 2025
ef633a2
Work around sequencer not building in Docker
romac May 26, 2025
3123e4c
Use `--locked` for build command
romac May 26, 2025
8b02010
Generate `uv` project
romac May 26, 2025
b7dfa9c
Do not attempt to decode txes, store their raw bytes instead
romac May 26, 2025
3db9509
Store raw protobuf transaction
romac May 26, 2025
e5353ba
Cleanup
romac May 26, 2025
4a82b52
Less noisy logs
romac May 26, 2025
b354edc
Merge commit 'd4eb8f6e' into anca/starknet_interop
romac May 28, 2025
b4dbfd2
chore(code): Fix clippy warnings on Rust 1.87 (#1047)
romac May 28, 2025
13a1307
Adjust proposal parameters again
romac May 28, 2025
6ad93bf
feat(code): Compute and show in logs the role of the node during a ro…
romac May 27, 2025
3d07135
Cleanup
romac May 28, 2025
cc5994d
Update Malachite config template
romac May 28, 2025
d2a5456
Resolve issues from anca/starknet_interop
kirdatatjana Sep 12, 2025
78f88bd
Updated README.md file
kirdatatjana Sep 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
805 changes: 382 additions & 423 deletions code/Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ where
height,
round,
proposer,
role,
} => {
let (reply_value, rx_value) = oneshot::channel();

Expand All @@ -87,6 +88,7 @@ where
height,
round,
proposer,
role,
reply_value,
})
.await?;
Expand Down
3 changes: 3 additions & 0 deletions code/crates/app-channel/src/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use bytes::Bytes;
use derive_where::derive_where;
use malachitebft_app::consensus::Role;
use malachitebft_app::types::core::ValueOrigin;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -49,6 +50,8 @@ pub enum AppMsg<Ctx: Context> {
round: Round,
/// Proposer for that round
proposer: Ctx::Address,
/// Role that this node is playing in this round
role: Role,
/// Channel for sending back previously received undecided values to consensus
reply_value: Reply<Vec<ProposedValue<Ctx>>>,
},
Expand Down
3 changes: 2 additions & 1 deletion code/crates/app/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ fn make_gossip_config(cfg: &ConsensusConfig) -> NetworkConfig {
},
PubSubProtocol::Broadcast => GossipSubConfig::default(),
},
channel_names: malachitebft_network::ChannelNames::default(),
rpc_max_size: cfg.p2p.rpc_max_size.as_u64() as usize,
pubsub_max_size: cfg.p2p.pubsub_max_size.as_u64() as usize,
enable_sync: true,
enable_sync: cfg.vote_sync.mode.is_request_response(),
}
}
4 changes: 2 additions & 2 deletions code/crates/core-consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use malachitebft_core_types::*;

use crate::input::RequestId;
use crate::types::SignedConsensusMsg;
use crate::{ConsensusMsg, VoteExtensionError, WalEntry};
use crate::{ConsensusMsg, Role, VoteExtensionError, WalEntry};

/// Provides a way to construct the appropriate [`Resume`] value to
/// resume execution after handling an [`Effect`].
Expand Down Expand Up @@ -77,7 +77,7 @@ where
/// Consensus is starting a new round with the given proposer
///
/// Resume with: [`resume::Continue`]
StartRound(Ctx::Height, Round, Ctx::Address, resume::Continue),
StartRound(Ctx::Height, Round, Ctx::Address, Role, resume::Continue),

/// Publish a message to peers
///
Expand Down
16 changes: 14 additions & 2 deletions code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::prelude::*;
use crate::types::SignedConsensusMsg;
use crate::util::pretty::PrettyVal;
use crate::LocallyProposedValue;
use crate::Role;
use crate::VoteSyncMode;

use super::propose::on_propose;
Expand All @@ -29,12 +30,23 @@ where
#[cfg(feature = "metrics")]
metrics.round.set(round.as_i64());

info!(%height, %round, %proposer, "Starting new round");
let role = if state.address() == proposer {
Role::Proposer
} else if state.is_validator() {
Role::Validator
} else {
Role::None
};

info!(%height, %round, %proposer, ?role, "Starting new round");

state.last_signed_prevote = None;
state.last_signed_precommit = None;

perform!(co, Effect::CancelAllTimeouts(Default::default()));
perform!(
co,
Effect::StartRound(*height, *round, proposer.clone(), Default::default())
Effect::StartRound(*height, *round, proposer.clone(), role, Default::default())
);
}

Expand Down
11 changes: 11 additions & 0 deletions code/crates/core-consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ pub use malachitebft_core_types::ValuePayload;
pub use malachitebft_peer::PeerId;
pub use multiaddr::Multiaddr;

/// The role that the node is playing in the consensus protocol during a round.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Role {
/// The node is the proposer for the current round.
Proposer,
/// The node is a validator for the current round.
Validator,
/// The node is not participating in the consensus protocol for the current round.
None,
}

/// A signed consensus message, ie. a signed vote or a signed proposal.
#[derive_where(Clone, Debug, PartialEq, Eq)]
pub enum SignedConsensusMsg<Ctx: Context> {
Expand Down
11 changes: 7 additions & 4 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,16 +947,18 @@ where
Ok(r.resume_with(()))
}

Effect::StartRound(height, round, proposer, r) => {
Effect::StartRound(height, round, proposer, role, r) => {
self.wal_flush(state.phase).await?;

self.host.cast(HostMsg::StartedRound {
height,
round,
proposer,
proposer: proposer.clone(),
role,
})?;

self.tx_event.send(|| Event::StartedRound(height, round));
self.tx_event
.send(|| Event::StartedRound(height, round, proposer, role));

Ok(r.resume_with(()))
}
Expand Down Expand Up @@ -1256,7 +1258,8 @@ where
skip_all,
fields(
height = %state.consensus.height(),
round = %state.consensus.round()
round = %state.consensus.round(),
address = %state.consensus.address()
)
)]
async fn post_start(
Expand Down
3 changes: 2 additions & 1 deletion code/crates/engine/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use derive_where::derive_where;
use ractor::{ActorRef, RpcReplyPort};

use malachitebft_core_consensus::VoteExtensionError;
use malachitebft_core_consensus::{Role, VoteExtensionError};
use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId, VoteExtensions};
use malachitebft_sync::{PeerId, RawDecidedValue};

Expand All @@ -27,6 +27,7 @@ pub enum HostMsg<Ctx: Context> {
height: Ctx::Height,
round: Round,
proposer: Ctx::Address,
role: Role,
},

/// Request to build a local value to propose
Expand Down
6 changes: 4 additions & 2 deletions code/crates/engine/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub enum State<Ctx: Context> {
listen_addrs: Vec<Multiaddr>,
peers: BTreeSet<PeerId>,
output_port: OutputPort<NetworkEvent<Ctx>>,
ctrl_handle: CtrlHandle,
ctrl_handle: Box<CtrlHandle>,
recv_task: JoinHandle<()>,
inbound_requests: HashMap<InboundRequestId, request_response::InboundRequestId>,
},
Expand Down Expand Up @@ -204,7 +204,7 @@ where
listen_addrs: Vec::new(),
peers: BTreeSet::new(),
output_port: OutputPort::with_capacity(128),
ctrl_handle,
ctrl_handle: Box::new(ctrl_handle),
recv_task,
inbound_requests: HashMap::new(),
})
Expand Down Expand Up @@ -329,6 +329,8 @@ where
}

Msg::NewEvent(Event::Message(Channel::Consensus, from, data)) => {
tracing::trace!(%from, "Received consensus message: {data:?}");

let msg = match self.codec.decode(data) {
Ok(msg) => msg,
Err(e) => {
Expand Down
8 changes: 4 additions & 4 deletions code/crates/engine/src/util/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ractor::ActorProcessingErr;
use tokio::sync::broadcast;

use malachitebft_core_consensus::{
LocallyProposedValue, ProposedValue, SignedConsensusMsg, WalEntry,
LocallyProposedValue, ProposedValue, Role, SignedConsensusMsg, WalEntry,
};
use malachitebft_core_types::{CommitCertificate, Context, Round, SignedVote, ValueOrigin};

Expand Down Expand Up @@ -43,7 +43,7 @@ impl<Ctx: Context> Default for TxEvent<Ctx> {
#[derive_where(Clone, Debug)]
pub enum Event<Ctx: Context> {
StartedHeight(Ctx::Height, bool),
StartedRound(Ctx::Height, Round),
StartedRound(Ctx::Height, Round, Ctx::Address, Role),
Published(SignedConsensusMsg<Ctx>),
ProposedValue(LocallyProposedValue<Ctx>),
ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),
Expand All @@ -63,8 +63,8 @@ impl<Ctx: Context> fmt::Display for Event<Ctx> {
Event::StartedHeight(height, restart) => {
write!(f, "StartedHeight(height: {height}, restart: {restart})")
}
Event::StartedRound(height, round) => {
write!(f, "StartedRound(height: {height}, round: {round})")
Event::StartedRound(height, round, proposer, role) => {
write!(f, "StartedRound(height: {height}, round: {round}, proposer: {proposer}, role: {role:?})")
}
Event::Published(msg) => write!(f, "Published(msg: {msg:?})"),
Event::ProposedValue(value) => write!(f, "ProposedValue(value: {value:?})"),
Expand Down
5 changes: 4 additions & 1 deletion code/crates/engine/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ where
match msg {
Msg::StartedHeight(height, reply_to) => {
if state.height == height {
debug!(%height, "WAL already at height, ignoring");
debug!(%height, "WAL already at height, returning empty entries");
reply_to
.send(Ok(None))
.map_err(|e| eyre!("Failed to send reply: {e}"))?;
return Ok(());
}

Expand Down
3 changes: 2 additions & 1 deletion code/crates/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use malachitebft_discovery as discovery;
use malachitebft_metrics::Registry;
use malachitebft_sync as sync;

use crate::{Config, GossipSubConfig, PROTOCOL};
use crate::{Config, GossipSubConfig, PubSubProtocol, PROTOCOL};

#[derive(Debug)]
pub enum NetworkEvent {
Expand Down Expand Up @@ -133,6 +133,7 @@ fn message_id(message: &gossipsub::Message) -> gossipsub::MessageId {

fn gossipsub_config(config: GossipSubConfig, max_transmit_size: usize) -> gossipsub::Config {
gossipsub::ConfigBuilder::default()
.protocol_id_prefix("/meshsub")
.max_transmit_size(max_transmit_size)
.opportunistic_graft_ticks(3)
.heartbeat_interval(Duration::from_secs(1))
Expand Down
79 changes: 56 additions & 23 deletions code/crates/network/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
use core::fmt;
use std::sync::OnceLock;

use futures::channel;
use libp2p::gossipsub;
use libp2p_broadcast as broadcast;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Copy)]
pub struct ChannelNames {
pub consensus: &'static str,
pub proposal_parts: &'static str,
pub sync: &'static str,
}

impl Default for ChannelNames {
fn default() -> Self {
Self {
consensus: "consensus_votes",
proposal_parts: "consensus_proposals",
sync: "sync",
}
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Channel {
Consensus,
Expand All @@ -20,55 +39,69 @@ impl Channel {
&[Channel::Consensus, Channel::ProposalParts]
}

pub fn to_gossipsub_topic(self) -> gossipsub::IdentTopic {
gossipsub::IdentTopic::new(self.as_str())
pub fn to_gossipsub_topic(self, channel_names: ChannelNames) -> gossipsub::Sha256Topic {
// gossipsub::IdentTopic::new(self.as_str())
gossipsub::Sha256Topic::new(self.as_str(channel_names))
}

pub fn to_broadcast_topic(self) -> broadcast::Topic {
broadcast::Topic::new(self.as_str().as_bytes())
pub fn to_broadcast_topic(self, channel_names: ChannelNames) -> broadcast::Topic {
broadcast::Topic::new(self.as_str(channel_names).as_bytes())
}

pub fn as_str(&self) -> &'static str {
pub fn as_str(&self, channel_names: ChannelNames) -> &'static str {
match self {
Channel::Consensus => "/consensus",
Channel::ProposalParts => "/proposal_parts",
Channel::Sync => "/sync",
Channel::Consensus => channel_names.consensus,
Channel::ProposalParts => channel_names.proposal_parts,
Channel::Sync => channel_names.sync,
}
}

pub fn has_gossipsub_topic(topic_hash: &gossipsub::TopicHash) -> bool {
pub fn has_gossipsub_topic(
topic_hash: &gossipsub::TopicHash,
channel_names: ChannelNames,
) -> bool {
Self::all()
.iter()
.any(|channel| &channel.to_gossipsub_topic().hash() == topic_hash)
.any(|channel| &channel.to_gossipsub_topic(channel_names).hash() == topic_hash)
}

pub fn has_broadcast_topic(topic: &broadcast::Topic) -> bool {
pub fn has_broadcast_topic(topic: &broadcast::Topic, channel_names: ChannelNames) -> bool {
Self::all()
.iter()
.any(|channel| &channel.to_broadcast_topic() == topic)
.any(|channel| &channel.to_broadcast_topic(channel_names) == topic)
}

pub fn from_gossipsub_topic_hash(topic: &gossipsub::TopicHash) -> Option<Self> {
match topic.as_str() {
"/consensus" => Some(Channel::Consensus),
"/proposal_parts" => Some(Channel::ProposalParts),
"/sync" => Some(Channel::Sync),
_ => None,
pub fn from_gossipsub_topic_hash(
topic: &gossipsub::TopicHash,
channel_names: ChannelNames,
) -> Option<Self> {
if topic == &Self::Consensus.to_gossipsub_topic(channel_names).hash() {
Some(Self::Consensus)
} else if topic == &Self::ProposalParts.to_gossipsub_topic(channel_names).hash() {
Some(Self::ProposalParts)
} else if topic == &Self::Sync.to_gossipsub_topic(channel_names).hash() {
Some(Self::Sync)
} else {
None
}
}

pub fn from_broadcast_topic(topic: &broadcast::Topic) -> Option<Self> {
pub fn from_broadcast_topic(
topic: &broadcast::Topic,
channel_names: ChannelNames,
) -> Option<Self> {
match topic.as_ref() {
b"/consensus" => Some(Channel::Consensus),
b"/proposal_parts" => Some(Channel::ProposalParts),
b"/sync" => Some(Channel::Sync),
name if name == channel_names.consensus.as_bytes() => Some(Self::Consensus),
name if name == channel_names.proposal_parts.as_bytes() => Some(Self::ProposalParts),
name if name == channel_names.sync.as_bytes() => Some(Self::Sync),
_ => None,
}
}
}

impl fmt::Display for Channel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.as_str().fmt(f)
// TODO: how to use the correct channel names?
self.as_str(ChannelNames::default()).fmt(f)
}
}
Loading