Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Move to stable MSRV 1.85
- Move to rust edition 2024

## [0.7.0] - 2024-10-21

Expand Down
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
[package]
name = "kadcast"
authors = ["herr-seppia <seppia@dusk.network>"]
version = "0.7.0"
edition = "2018"
version = "0.8.0-rc.0"
description = "Implementation of the Kadcast Network Protocol."
categories = ["network-programming"]
keywords = ["p2p", "network", "kad", "peer-to-peer", "kadcast"]
Expand All @@ -11,6 +10,7 @@ repository = "https://github.com/dusk-network/kadcast"
publish = true

rust-version = "1.85"
edition = "2024"

exclude = [".git*", "ARCHITECTURE.md", "architecture.jpg"]

Expand All @@ -30,7 +30,6 @@ tokio = { version = "1", features = [
raptorq = { version = "2.0", optional = true }
tracing = "0.1"
itertools = "0.10"
konst = "0.2"
socket2 = "0.4"
serde_derive = "1"
serde = "1"
Expand Down
2 changes: 1 addition & 1 deletion examples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::io::{self, BufRead};
use clap::{App, Arg};
use kadcast::config::Config;
use kadcast::{MessageInfo, NetworkListen, Peer};
use rustc_tools_util::{get_version_info, VersionInfo};
use rustc_tools_util::{VersionInfo, get_version_info};
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let crate_info = get_version_info!();
Expand Down
2 changes: 1 addition & 1 deletion src/encoding/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::io::{self, Error, Read, Write};

use super::Marshallable;
use crate::{kbucket::BinaryID, K_ID_LEN_BYTES, K_NONCE_LEN};
use crate::{K_ID_LEN_BYTES, K_NONCE_LEN, kbucket::BinaryID};

#[derive(Debug, PartialEq, Clone, Copy)]
pub struct Header {
Expand Down
2 changes: 1 addition & 1 deletion src/encoding/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::io::{self, Error, Read, Write};
use semver::Version;

pub(crate) use super::payload::{BroadcastPayload, NodePayload};
pub use super::{header::Header, Marshallable};
pub use super::{Marshallable, header::Header};
use crate::kbucket::BinaryKey;

// PingMsg wire Ping message id.
Expand Down
2 changes: 1 addition & 1 deletion src/encoding/payload/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::convert::TryInto;
use std::io::{self, Read, Write};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};

use crate::{encoding::Marshallable, kbucket::BinaryKey, K_ID_LEN_BYTES};
use crate::{K_ID_LEN_BYTES, encoding::Marshallable, kbucket::BinaryKey};

#[derive(Debug, PartialEq)]
pub(crate) struct NodePayload {
Expand Down
2 changes: 1 addition & 1 deletion src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::encoding::message::{
use crate::kbucket::{BinaryKey, NodeInsertError, NodeInsertOk, Tree};
use crate::peer::{PeerInfo, PeerNode};
use crate::transport::{MessageBeanIn, MessageBeanOut};
use crate::{RwLock, K_K};
use crate::{K_K, RwLock};

/// Message metadata for incoming message notifications
#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tracing::info;
mod bucket;
mod key;
mod node;
use crate::config::BucketConfig;
use crate::K_BETA;
use crate::config::BucketConfig;

pub type BucketHeight = u8;

Expand Down Expand Up @@ -75,7 +75,7 @@ impl<V> Tree<V> {
let max_h = max_h.unwrap_or(BucketHeight::MAX);
self.buckets
.iter()
.filter(move |(&height, _)| height <= max_h)
.filter(move |&(&height, _)| height <= max_h)
.map(|(&height, bucket)| (height, bucket.pick::<K_BETA>()))
}

Expand Down
6 changes: 3 additions & 3 deletions src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use semver::Version;

use super::node::{Node, NodeEvictionStatus};
use super::BinaryKey;
use crate::config::BucketConfig;
use super::node::{Node, NodeEvictionStatus};
use crate::K_K;
use crate::config::BucketConfig;

/// Represents a bucket for storing nodes in a Kademlia routing table.
pub(super) struct Bucket<V> {
Expand Down Expand Up @@ -302,10 +302,10 @@ mod tests {
use std::time::Duration;

use super::*;
use crate::K_BETA;
use crate::kbucket::Tree;
use crate::peer::PeerNode;
use crate::tests::Result;
use crate::K_BETA;

impl<V> Bucket<V> {
pub fn last_id(&self) -> Option<&BinaryKey> {
Expand Down
2 changes: 1 addition & 1 deletion src/kbucket/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

use std::io;

use crate::encoding::Marshallable;
use crate::K_ID_LEN_BYTES;
use crate::K_NONCE_LEN;
use crate::encoding::Marshallable;

pub type BinaryKey = [u8; K_ID_LEN_BYTES];
pub type BinaryNonce = [u8; K_NONCE_LEN];
Expand Down
2 changes: 1 addition & 1 deletion src/kbucket/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

use std::time::{Duration, Instant};

use super::key::BinaryID;
use super::BucketHeight;
use super::key::BinaryID;

/// A struct representing a node in the network with an associated ID, value,
/// and eviction status.
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const K_DIFF_PRODUCED_BIT: usize = 8;

const fn get_k_k() -> usize {
match option_env!("KADCAST_K") {
Some(v) => match konst::primitive::parse_usize(v) {
Some(v) => match usize::from_str_radix(v, 10) {
Ok(e) => e,
Err(_) => DEFAULT_K_K,
},
Expand Down Expand Up @@ -243,7 +243,9 @@ impl Peer {
const LAST_BUCKET_IDX: u8 = MAX_BUCKET_HEIGHT as u8 - 1;
let ktable = self.ktable.read().await;
if height.is_none() && ktable.bucket_size(LAST_BUCKET_IDX) == 0 {
warn!("Broadcasting a new message with empty bucket height {LAST_BUCKET_IDX}")
warn!(
"Broadcasting a new message with empty bucket height {LAST_BUCKET_IDX}"
)
}
ktable
.extract(height)
Expand Down
2 changes: 1 addition & 1 deletion src/maintainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::encoding::message::{Header, Message};
use crate::kbucket::Tree;
use crate::peer::PeerInfo;
use crate::transport::MessageBeanOut;
use crate::{RwLock, K_ALPHA};
use crate::{K_ALPHA, RwLock};

pub(crate) struct TableMaintainer {
bootstrapping_nodes: Vec<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use blake2::{Blake2s256, Digest};

use crate::kbucket::{BinaryID, BinaryKey};
pub type PeerNode = Node<PeerInfo>;
use crate::K_ID_LEN_BYTES;
use crate::encoding::message::Header;
use crate::encoding::payload::{IpInfo, PeerEncodedInfo};
use crate::kbucket::Node;
use crate::K_ID_LEN_BYTES;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct PeerInfo {
address: SocketAddr,
Expand Down
2 changes: 1 addition & 1 deletion src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error, info, trace, warn};

use crate::config::Config;
use crate::encoding::message::Message;
use crate::encoding::Marshallable;
use crate::encoding::message::Message;
use crate::rwlock::RwLock;
use crate::transport::encoding::{
Configurable, Decoder, Encoder, TransportDecoder, TransportEncoder,
Expand Down
10 changes: 5 additions & 5 deletions src/transport/encoding/raptorq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::io::{self, ErrorKind};
use blake2::{Blake2s256, Digest};
use safe::{SafeObjectTransmissionInformation, TransmissionInformationError};

use crate::encoding::{payload::BroadcastPayload, Marshallable};
use crate::encoding::{Marshallable, payload::BroadcastPayload};

mod decoder;
mod encoder;
Expand Down Expand Up @@ -116,7 +116,7 @@ mod tests {
let mut data = vec![0; 100_000];

for i in 0..data.len() {
data[i] = rand::Rng::gen(&mut rand::thread_rng());
data[i] = rand::Rng::r#gen(&mut rand::thread_rng());
}
let peer = PeerNode::generate("192.168.0.1:666", 0)?;
let header = peer.to_header();
Expand Down Expand Up @@ -173,7 +173,7 @@ mod tests {
let mut data = vec![0; DATA_LEN];

for i in 0..DATA_LEN {
data[i] = rand::Rng::gen(&mut rand::thread_rng());
data[i] = rand::Rng::r#gen(&mut rand::thread_rng());
}
let peer = PeerNode::generate("192.168.0.1:666", 0)?;
let header = peer.to_header();
Expand Down Expand Up @@ -201,7 +201,7 @@ mod tests {
for _ in 0..junks_messages {
let mut gossip_frame = vec![];
for _ in 0..DATA_LEN {
gossip_frame.push(rand::Rng::gen(&mut rand::thread_rng()));
gossip_frame.push(rand::Rng::r#gen(&mut rand::thread_rng()));
}
let msg = Message::broadcast(
header,
Expand Down Expand Up @@ -258,7 +258,7 @@ mod tests {
c.seek(std::io::SeekFrom::Start(0))?;
c.read_to_end(&mut bytes)?;
for i in 44..bytes.len() {
bytes[i] = rand::Rng::gen(&mut rand::thread_rng());
bytes[i] = rand::Rng::r#gen(&mut rand::thread_rng());
}
let c = Cursor::new(bytes);
let mut reader = BufReader::new(c);
Expand Down
4 changes: 2 additions & 2 deletions src/transport/encoding/raptorq/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use tracing::{debug, trace, warn};
use super::{ChunkedPayload, RAY_ID_SIZE, TRANSMISSION_INFO_SIZE};
use crate::encoding::message::Message;
use crate::encoding::payload::BroadcastPayload;
use crate::transport::encoding::Configurable;
use crate::transport::Decoder;
use crate::transport::encoding::Configurable;

const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(60);
const DEFAULT_CACHE_PRUNE_EVERY: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -249,8 +249,8 @@ mod tests {
use super::*;
use crate::peer::PeerNode;
use crate::tests::Result;
use crate::transport::encoding::raptorq::RaptorQEncoder;
use crate::transport::encoding::Encoder;
use crate::transport::encoding::raptorq::RaptorQEncoder;

impl RaptorQDecoder {
fn cache_size(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/encoding/raptorq/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::io;

use crate::encoding::message::Message;
use crate::encoding::payload::BroadcastPayload;
use crate::transport::encoding::Configurable;
use crate::transport::Encoder;
use crate::transport::encoding::Configurable;

const DEFAULT_MIN_REPAIR_PACKETS_PER_BLOCK: u32 = 5;
const DEFAULT_MTU: u16 = 1300;
Expand Down
2 changes: 1 addition & 1 deletion src/transport/encoding/raptorq/safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::convert::TryFrom;

use raptorq::ObjectTransmissionInformation;

use super::{encoder::MAX_MTU, TRANSMISSION_INFO_SIZE};
use super::{TRANSMISSION_INFO_SIZE, encoder::MAX_MTU};

// This should eventually become <https://doc.rust-lang.org/std/primitive.u64.html#method.div_ceil>
// when it gets stabilized, and this function should be removed.
Expand Down
6 changes: 4 additions & 2 deletions src/transport/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::runtime::Handle;
use tokio::task::block_in_place;
use tokio::time::{self, timeout, Interval};
use tokio::time::{self, Interval, timeout};
use tracing::{info, warn};

use super::encoding::Configurable;
Expand Down Expand Up @@ -93,7 +93,9 @@ impl MultipleOutSocket {
}
Ok(Err(e)) | Err(e) => {
if i < max_retry {
warn!("Unable to send msg, temptative {i}/{max_retry} - {e}");
warn!(
"Unable to send msg, temptative {i}/{max_retry} - {e}"
);
tokio::time::sleep(self.udp_send_retry_interval).await
} else {
return Err(e);
Expand Down
8 changes: 6 additions & 2 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod tests {
tokio::time::sleep(Duration::from_millis(2000)).await;
let mut data: Vec<u8> = vec![0; MESSAGE_SIZE];
for i in 0..data.len() {
data[i] = rand::Rng::gen(&mut rand::thread_rng());
data[i] = rand::Rng::r#gen(&mut rand::thread_rng());
}
for i in 0..NODES {
info!("ROUTING TABLE PEER #{}", i);
Expand Down Expand Up @@ -134,7 +134,11 @@ mod tests {
let removed = missing.remove(&(receiver_port as i32));
info!(
"RECEIVER PORT: {} - Message N° {} got from {:?} - Left {} - Removed {:?}",
receiver_port, i, message.1, missing.len(), removed
receiver_port,
i,
message.1,
missing.len(),
removed
);
}
}
Expand Down