Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/bitvm2/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ impl FromStr for Actor {
"Committee" => Ok(Actor::Committee),
"Operator" => Ok(Actor::Operator),
"Challenger" => Ok(Actor::Challenger),
"Relayer" => Ok(Actor::Relayer),
"All" => Ok(Actor::All),
_ => Err(()),
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/bitvm2/src/verifier/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub fn sign_disprove(
#[cfg(test)]
mod tests {
#[test]
#[ignore]
fn test_extract_proof() {
use crate::{
committee, operator,
Expand Down
72 changes: 55 additions & 17 deletions node/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ pub async fn recv_and_dispatch(
id: MessageId,
message: &[u8],
) -> Result<(), Box<dyn std::error::Error>> {
let message: GOATMessage = serde_json::from_slice(message)?;
tracing::info!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(message),
"Got message: {}:{} with id: {} from peer: {:?}",
&message.actor.to_string(),
String::from_utf8_lossy(&message.content),
id,
peer_id
);
Expand All @@ -203,18 +205,17 @@ pub async fn recv_and_dispatch(

return Ok(());
}
let message: GOATMessage = serde_json::from_slice(message)?;
println!("Received message: {:?}", message);
if message.actor != actor && message.actor != Actor::All && actor != Actor::Relayer {
return Ok(());
}
println!("Handle message: {:?}", message);
// // no need to check actor here, it's matched in the subsequent match block.
// if message.actor != actor && message.actor != Actor::All && actor != Actor::Relayer {
// return Ok(());
// }
let content: GOATMessageContent = message.to_typed()?;
// TODO: validate message
match (content, actor) {
// pegin
// CreateInstance sent by bootnode
(GOATMessageContent::CreateInstance(receive_data), Actor::Committee) => {
tracing::info!("Handle CreateInstance");
let instance_id = receive_data.instance_id;
let master_key = CommitteeMasterKey::new(env::get_bitvm_key()?);
let keypair = master_key.keypair_for_instance(instance_id);
Expand All @@ -232,13 +233,20 @@ pub async fn recv_and_dispatch(
send_to_peer(swarm, GOATMessage::from_typed(Actor::All, &message_content)?)?;
}
(GOATMessageContent::CreateGraphPrepare(receive_data), Actor::Operator) => {
tracing::info!("Handle CreateGraphPrepare");
store_committee_pubkeys(
client,
receive_data.instance_id,
receive_data.committee_member_pubkey,
)
.await?;
let collected_keys = get_committee_pubkeys(client, receive_data.instance_id).await?;
tracing::info!(
"instance {}, {}/{} committee-public-key collected",
receive_data.instance_id,
collected_keys.len(),
receive_data.committee_members_num
);
if collected_keys.len() == receive_data.committee_members_num
&& should_generate_graph(client, &receive_data).await?
{
Expand Down Expand Up @@ -294,6 +302,7 @@ pub async fn recv_and_dispatch(
};
}
(GOATMessageContent::CreateGraph(receive_data), Actor::Committee) => {
tracing::info!("Handle CreateGraph");
store_graph(
client,
receive_data.instance_id,
Expand All @@ -318,6 +327,7 @@ pub async fn recv_and_dispatch(
send_to_peer(swarm, GOATMessage::from_typed(Actor::Committee, &message_content)?)?;
}
(GOATMessageContent::NonceGeneration(receive_data), Actor::Committee) => {
tracing::info!("Handle NonceGeneration");
store_committee_pub_nonces(
client,
receive_data.instance_id,
Expand All @@ -326,17 +336,24 @@ pub async fn recv_and_dispatch(
receive_data.pub_nonces,
)
.await?;
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
let master_key = CommitteeMasterKey::new(env::get_bitvm_key()?);
let keypair = master_key.keypair_for_instance(receive_data.instance_id);
let nonces =
master_key.nonces_for_graph(receive_data.instance_id, receive_data.graph_id);
let sec_nonces: [SecNonce; COMMITTEE_PRE_SIGN_NUM] =
std::array::from_fn(|i| nonces[i].0.clone());
let collected_pub_nonces =
get_committee_pub_nonces(client, receive_data.instance_id, receive_data.graph_id)
.await?;
tracing::info!(
"graph {}, {}/{} committee-pub-nonces-pack collected",
receive_data.graph_id,
collected_pub_nonces.len(),
receive_data.committee_members_num
);
if collected_pub_nonces.len() == receive_data.committee_members_num {
let graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
let master_key = CommitteeMasterKey::new(env::get_bitvm_key()?);
let keypair = master_key.keypair_for_instance(receive_data.instance_id);
let nonces =
master_key.nonces_for_graph(receive_data.instance_id, receive_data.graph_id);
let sec_nonces: [SecNonce; COMMITTEE_PRE_SIGN_NUM] =
std::array::from_fn(|i| nonces[i].0.clone());
let agg_nonces = nonces_aggregation(collected_pub_nonces);
let committee_partial_sigs =
committee_pre_sign(keypair, sec_nonces, agg_nonces.clone(), &graph)?;
Expand All @@ -352,6 +369,7 @@ pub async fn recv_and_dispatch(
};
}
(GOATMessageContent::CommitteePresign(receive_data), Actor::Operator) => {
tracing::info!("Handle CommitteePresign");
if Some((receive_data.instance_id, receive_data.graph_id))
== statics::current_processing_graph()
{
Expand All @@ -369,6 +387,12 @@ pub async fn recv_and_dispatch(
receive_data.graph_id,
)
.await?;
tracing::info!(
"graph {}, {}/{} committee-partial-sigs-pack collected",
receive_data.graph_id,
collected_partial_sigs.len(),
receive_data.committee_members_num
);
if collected_partial_sigs.len() == receive_data.committee_members_num {
let mut grouped_partial_sigs: [Vec<PartialSignature>; COMMITTEE_PRE_SIGN_NUM] =
Default::default();
Expand Down Expand Up @@ -419,6 +443,7 @@ pub async fn recv_and_dispatch(
};
}
(GOATMessageContent::GraphFinalize(receive_data), _) => {
tracing::info!("Handle GraphFinalize");
// TODO: validate graph & ipfs
store_graph(
client,
Expand All @@ -442,6 +467,7 @@ pub async fn recv_and_dispatch(
// peg-out
// KickoffReady sent by relayer
(GOATMessageContent::KickoffReady(receive_data), Actor::Operator) => {
tracing::info!("Handle KickoffReady");
let mut graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if graph.parameters.operator_pubkey == env::get_node_pubkey()?
Expand Down Expand Up @@ -473,6 +499,7 @@ pub async fn recv_and_dispatch(
}
// KickoffSent sent by relayer
(GOATMessageContent::KickoffSent(receive_data), Actor::Challenger) => {
tracing::info!("Handle KickoffSent");
let mut graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if should_challenge(
Expand Down Expand Up @@ -512,6 +539,7 @@ pub async fn recv_and_dispatch(
}
// Take1Ready sent by relayer
(GOATMessageContent::Take1Ready(receive_data), Actor::Operator) => {
tracing::info!("Handle Take1Ready");
let mut graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if graph.parameters.operator_pubkey == env::get_node_pubkey()?
Expand Down Expand Up @@ -543,6 +571,7 @@ pub async fn recv_and_dispatch(
// ChallengeSent sent by challenger
// if challenger
(GOATMessageContent::ChallengeSent(receive_data), Actor::Operator) => {
tracing::info!("Handle ChallengeSent");
let mut graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if graph.parameters.operator_pubkey == env::get_node_pubkey()?
Expand Down Expand Up @@ -582,6 +611,7 @@ pub async fn recv_and_dispatch(
}
// Take2Ready sent by relayer
(GOATMessageContent::Take2Ready(receive_data), Actor::Operator) => {
tracing::info!("Handle Take2Ready");
let mut graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if graph.parameters.operator_pubkey == env::get_node_pubkey()?
Expand Down Expand Up @@ -612,6 +642,7 @@ pub async fn recv_and_dispatch(
}
// AssertSent sent by relayer
(GOATMessageContent::AssertSent(receive_data), Actor::Challenger) => {
tracing::info!("Handle AssertSent");
let mut graph =
get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if let Some(disprove_witness) = validate_assert(
Expand Down Expand Up @@ -657,6 +688,7 @@ pub async fn recv_and_dispatch(

// Relayer handles
(GOATMessageContent::Take1Sent(receive_data), Actor::Relayer) => {
tracing::info!("Handle Take1Sent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
let take1_txid = graph.take1.tx().compute_txid();
if tx_on_chain(client, &take1_txid).await? {
Expand All @@ -674,6 +706,7 @@ pub async fn recv_and_dispatch(
}
}
(GOATMessageContent::Take2Sent(receive_data), Actor::Relayer) => {
tracing::info!("Handle Take2Sent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if tx_on_chain(client, &graph.take2.tx().compute_txid()).await? {
finish_withdraw_unhappy_path(client, &receive_data.graph_id, &graph.take2).await?;
Expand All @@ -690,6 +723,7 @@ pub async fn recv_and_dispatch(
}
}
(GOATMessageContent::DisproveSent(receive_data), Actor::Relayer) => {
tracing::info!("Handle DisproveSent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if validate_disprove(
client,
Expand All @@ -715,6 +749,7 @@ pub async fn recv_and_dispatch(

// Other participants update graph status
(GOATMessageContent::KickoffSent(receive_data), _) => {
tracing::info!("Handle KickoffSent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if tx_on_chain(client, &graph.kickoff.tx().compute_txid()).await? {
update_graph_fields(
Expand All @@ -729,6 +764,7 @@ pub async fn recv_and_dispatch(
}
}
(GOATMessageContent::ChallengeSent(receive_data), _) => {
tracing::info!("Handle ChallengeSent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if validate_challenge(
client,
Expand All @@ -749,6 +785,7 @@ pub async fn recv_and_dispatch(
}
}
(GOATMessageContent::Take1Sent(receive_data), _) => {
tracing::info!("Handle Take1Sent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if tx_on_chain(client, &graph.take1.tx().compute_txid()).await? {
update_graph_fields(
Expand All @@ -764,6 +801,7 @@ pub async fn recv_and_dispatch(
}
}
(GOATMessageContent::Take2Sent(receive_data), _) => {
tracing::info!("Handle Take2Sent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if tx_on_chain(client, &graph.take2.tx().compute_txid()).await? {
update_graph_fields(
Expand All @@ -779,6 +817,7 @@ pub async fn recv_and_dispatch(
}
}
(GOATMessageContent::DisproveSent(receive_data), _) => {
tracing::info!("Handle DisproveSent");
let graph = get_graph(client, receive_data.instance_id, receive_data.graph_id).await?;
if validate_disprove(
client,
Expand All @@ -799,7 +838,6 @@ pub async fn recv_and_dispatch(
// NOTE: clean up other graphs?
}
}

_ => {}
}
Ok(())
Expand All @@ -811,7 +849,7 @@ pub(crate) fn send_to_peer(
) -> Result<MessageId, Box<dyn std::error::Error>> {
let actor = message.actor.to_string();
let gossipsub_topic = gossipsub::IdentTopic::new(actor);
Ok(swarm.behaviour_mut().gossipsub.publish(gossipsub_topic, &*message.content)?)
Ok(swarm.behaviour_mut().gossipsub.publish(gossipsub_topic, serde_json::to_vec(&message)?)?)
}

/// call the rpc service
Expand Down
18 changes: 10 additions & 8 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Create a Gosspipsub topic, we create 3 topics: committee, challenger, and operator
let topics = [Actor::Committee, Actor::Challenger, Actor::Operator]
let topics = [Actor::Committee, Actor::Challenger, Actor::Operator, Actor::Relayer, Actor::All]
.iter()
.map(|a| {
let topic_name = a.to_string();
Expand Down Expand Up @@ -235,16 +235,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
select! {
// For testing only
Ok(Some(line)) = stdin.next_line() => {
let commands = line.split(":").collect::<Vec<_>>();
if commands.len() < 2 || commands[0].trim().is_empty() {
println!("Message format: actor:message");
let commands = match line.split_once(":") {
Some((actor,msg)) => (actor.trim(),msg),
_ => {
println!("Message format: actor:message");
continue
}
}
};

if let Some(gossipsub_topic) = topics.get(commands[0]) {
if let Some(gossipsub_topic) = topics.get(commands.0) {
let message = serde_json::to_vec(&GOATMessage{
actor: Actor::from_str(commands[0]).unwrap(),
content: commands[1].as_bytes().to_vec(),
actor: Actor::from_str(commands.0).unwrap(),
content: commands.1.as_bytes().to_vec(),
}).unwrap();
if let Err(e) = swarm
.behaviour_mut()
Expand Down
Loading