Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nodes/nomos-executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ cryptarchia:
state_recording_interval: [ 60, 0 ]
ibd:
peers: [ ]
last_blocks_with_blob_validation: 8640
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how should we interpret this value and how would one select it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value must be defined in the DA spec, because this means the period how long DA nodes will keep blobs. But, it is not determined yet. I added this parameter here because we anyway need to for IBD.

sync:
orphan:
max_orphan_cache_size: 5
Expand Down
3 changes: 2 additions & 1 deletion nodes/nomos-node/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ cryptarchia:
state_recording_interval: [ 60, 0 ]
ibd:
peers: [ ]
last_blocks_with_blob_validation: 8640
sync:
orphan:
max_orphan_cache_size: 5
Expand All @@ -261,4 +262,4 @@ membership:
sdp:
wallet:
known_keys:
- "0000000000000000000000000000000000000000000000000000000000000000"
- "0000000000000000000000000000000000000000000000000000000000000000"
4 changes: 4 additions & 0 deletions nomos-services/chain/chain-service/src/bootstrap/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashSet, hash::Hash, time::Duration};

use cryptarchia_engine::Length;
use nomos_utils::bounded_duration::{MinimalBoundedDuration, SECOND};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -43,6 +44,9 @@ where
/// when no download is needed at the moment from a peer.
#[serde(default = "default_delay_before_new_download")]
pub delay_before_new_download: Duration,
/// Threshold in number of remaining blocks to the target height
/// at which blob validation becomes enabled during IBD.
pub last_blocks_with_blob_validation: Length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be first block with validation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended this parameter to represent the number of "remaining" blocks. So, I think that "last" makes more sense.

If there are N blocks in total that we have to catch up during IBD, and if there are last_blocks = 8000,
we're going to disable blob validation while we process N - 8000 blocks. After that, we will enable blob validation for next 8000 blocks.

Please let me know if this naming doesn't sound good. I will try to think about other names (or updating comments).

}

const fn default_offline_grace_period() -> Duration {
Expand Down
61 changes: 45 additions & 16 deletions nomos-services/chain/chain-service/src/bootstrap/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
time::Duration,
};

use cryptarchia_engine::Length;
use cryptarchia_sync::HeaderId;
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt as _};
use overwatch::DynError;
Expand All @@ -22,7 +23,7 @@ pub struct Downloads<'a, NodeId, Block> {
/// [`Future`]s that read a single block from a [`Download`].
downloads: FuturesUnordered<BoxFuture<'a, DownloadResult<NodeId, Block>>>,
/// A set of blocks that are being targeted by [`Self::downloads`].
targets: HashSet<HeaderId>,
targets: HashSet<Target>,
/// [`Delay`] for peers that have no download needed at the moment.
delays: FuturesUnordered<BoxFuture<'a, Delay<NodeId>>>,
/// The duration of a delay.
Expand Down Expand Up @@ -143,7 +144,7 @@ where
}));
}

pub const fn targets(&self) -> &HashSet<HeaderId> {
pub const fn targets(&self) -> &HashSet<Target> {
&self.targets
}

Expand Down Expand Up @@ -175,18 +176,38 @@ pub enum DownloadsOutput<NodeId, Block> {
pub struct Download<NodeId, Block> {
peer: NodeId,
/// The target block this download aims to reach.
target: HeaderId,
target: Target,
/// A stream of blocks that may continue up to [`Self::target`].
stream: BoxedStream<Result<(HeaderId, Block), DynError>>,
/// The last block that was read from [`Self::stream`].
/// [`None`] if no blocks were read yet.
last: Option<HeaderId>,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct Target {
id: HeaderId,
height: Length,
}

impl Target {
pub const fn new(id: HeaderId, height: Length) -> Self {
Self { id, height }
}

pub const fn id(&self) -> HeaderId {
self.id
}

pub const fn height(&self) -> Length {
self.height
}
}

impl<NodeId, Block> Download<NodeId, Block> {
pub fn new(
peer: NodeId,
target: HeaderId,
target: Target,
stream: BoxedStream<Result<(HeaderId, Block), DynError>>,
) -> Self {
Self {
Expand All @@ -204,6 +225,10 @@ impl<NodeId, Block> Download<NodeId, Block> {
pub const fn last(&self) -> Option<HeaderId> {
self.last
}

pub const fn target(&self) -> Target {
self.target
}
}

impl<NodeId, Block> Stream for Download<NodeId, Block>
Expand All @@ -216,7 +241,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Check if the target block has already been reached.
if let Some(last) = self.last {
if last == self.target {
if last == self.target.id() {
return Poll::Ready(None);
}
}
Expand Down Expand Up @@ -284,7 +309,7 @@ mod tests {
#[tokio::test]
async fn download_empty_stream() {
let peer: TestNodeId = 1;
let target = header_id(1);
let target = Target::new(header_id(1), 1u64.into());
let stream = block_stream(vec![]);
let mut download = Download::new(peer, target, stream);

Expand All @@ -294,7 +319,7 @@ mod tests {
#[tokio::test]
async fn download_blocks_until_target() {
let peer: TestNodeId = 1;
let target = header_id(3);
let target = Target::new(header_id(3), 3u64.into());
let stream = block_stream(vec![
Ok((header_id(1), 100)),
Ok((header_id(2), 200)),
Expand Down Expand Up @@ -329,7 +354,7 @@ mod tests {
#[tokio::test]
async fn download_blocks_if_no_target_in_stream() {
let peer: TestNodeId = 1;
let target = header_id(4);
let target = Target::new(header_id(4), 4u64.into());
let stream = block_stream(vec![
Ok((header_id(1), 100)),
Ok((header_id(2), 200)),
Expand All @@ -356,7 +381,7 @@ mod tests {
#[tokio::test]
async fn download_with_error() {
let peer: TestNodeId = 1;
let target = header_id(3);
let target = Target::new(header_id(3), 3u64.into());
let stream = block_stream(vec![
Ok((header_id(1), 100)),
Err(DynError::from("test error")),
Expand All @@ -377,7 +402,7 @@ mod tests {
#[tokio::test]
async fn add_single_download() {
let mut downloads = Downloads::new(Duration::from_millis(1));
let target = header_id(2);
let target = Target::new(header_id(2), 2u64.into());
let download = Download::new(
1,
target,
Expand Down Expand Up @@ -431,7 +456,7 @@ mod tests {
#[tokio::test]
async fn add_single_download_with_error() {
let mut downloads = Downloads::new(Duration::from_millis(1));
let target = header_id(2);
let target = Target::new(header_id(2), 2u64.into());
let download = Download::new(
1,
target,
Expand Down Expand Up @@ -477,12 +502,12 @@ mod tests {

// Download 1: Single block
let peer1: TestNodeId = 1;
let target1 = header_id(1);
let target1 = Target::new(header_id(1), 1u64.into());
let download1 = Download::new(peer1, target1, block_stream(vec![Ok((header_id(1), 100))]));

// Download 2: Two blocks
let peer2: TestNodeId = 2;
let target2 = header_id(3);
let target2 = Target::new(header_id(3), 2u64.into());
let download2 = Download::new(
peer2,
target2,
Expand Down Expand Up @@ -566,7 +591,7 @@ mod tests {

// Download 1
let peer1: TestNodeId = 1;
let target = header_id(1);
let target = Target::new(header_id(1), 1u64.into());
let download1 = Download::new(peer1, target, block_stream(vec![Ok((header_id(1), 100))]));

// Download 2: with the same target
Expand Down Expand Up @@ -616,7 +641,11 @@ mod tests {
// Add a download for peer1
let peer1: TestNodeId = 1;
// An empty stream for simplicity
let download = Download::new(peer1, header_id(1), block_stream(vec![]));
let download = Download::new(
peer1,
Target::new(header_id(1), 1u64.into()),
block_stream(vec![]),
);
downloads.add_download(download);

// Add a delay for peer2
Expand Down Expand Up @@ -644,7 +673,7 @@ mod tests {
// An empty stream for simplicity
let download = Download::new(
peer1,
header_id(1),
Target::new(header_id(1), 1u64.into()),
slow_block_stream(vec![Ok((header_id(1), 100))], Duration::from_secs(2)),
);
downloads.add_download(download);
Expand Down
Loading
Loading