Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 0 additions & 195 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ impl BifrostInner {

if tail.is_sealed()
&& matches!(opts, FindTailOptions::ConsistentRead)
&& Configuration::pinned().bifrost.experimental_chain_sealing
&& !logs.chain(&log_id).expect("log must exist").is_sealed()
{
debug!(%log_id, "Loglet {} is sealed but the chain is not. Sealing the chain", loglet.debug_str());
Expand Down Expand Up @@ -965,200 +964,6 @@ mod tests {
#[restate_core::test(start_paused = true)]
async fn test_read_across_segments() -> googletest::Result<()> {
const LOG_ID: LogId = LogId::new(0);
let node_env = TestCoreEnvBuilder::with_incoming_only_connector()
.set_partition_table(PartitionTable::with_equally_sized_partitions(
Version::MIN,
1,
))
.build()
.await;
// disable seal-marker feature flag
let mut config = Configuration::pinned().clone();
config.bifrost.experimental_chain_sealing = false;
Configuration::set(config);

let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await;

let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// Lsns [1..5]
for i in 1..=5 {
// Append a record to memory
let lsn = appender.append(format!("segment-1-{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
}

// not sealed, tail is what we expect
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailOptions::default())
.await?,
pat!(TailState::Open(eq(Lsn::new(6))))
);

let segment_1 = bifrost
.inner
.find_loglet_for_lsn(LOG_ID, Lsn::OLDEST)
.await?
.unwrap();

// seal the segment
bifrost
.admin()
.seal(LOG_ID, segment_1.segment_index(), SealMetadata::default())
.await?;

// sealed, tail is what we expect
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailOptions::default())
.await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

println!("attempting to read during reconfiguration");
// attempting to read from bifrost will result in a timeout since metadata sees this as an open
// segment but the segment itself is sealed. This means reconfiguration is in-progress
// and we can't confidently read records.
assert!(
tokio::time::timeout(Duration::from_secs(5), bifrost.read(LOG_ID, Lsn::new(2)))
.await
.is_err()
);

let metadata = Metadata::current();
let old_version = metadata.logs_version();

let mut builder = metadata.logs_ref().clone().into_builder();
let mut chain_builder = builder.chain(LOG_ID).unwrap();
assert_eq!(1, chain_builder.num_segments());
let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory);
// deliberately skips Lsn::from(6) to create a zombie record in segment 1. Segment 1 now has 4 records.
chain_builder.append_segment(Lsn::new(5), ProviderKind::InMemory, new_segment_params)?;

let new_metadata = builder.build();
let new_version = new_metadata.version();
assert_eq!(new_version, old_version.next());
node_env
.metadata_writer
.global_metadata()
.put(
new_metadata.into(),
Precondition::MatchesVersion(old_version),
)
.await?;

assert_eq!(new_version, metadata.logs_version());

{
// validate that the stored metadata matches our expectations.
let new_metadata = metadata.logs_ref().clone();
let chain_builder = new_metadata.chain(&LOG_ID).unwrap();
assert_eq!(2, chain_builder.num_segments());
}

// find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after
// the new segment was created at tail of the chain with base_lsn=5
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailOptions::default())
.await?,
pat!(TailState::Open(eq(Lsn::new(5))))
);

// appends should go to the new segment
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// Lsns [5..7]
for i in 5..=7 {
// Append a record to memory
let lsn = appender.append(format!("segment-2-{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
}

// tail is now 8 and open.
assert_that!(
bifrost
.find_tail(LOG_ID, FindTailOptions::default())
.await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

// validating that segment 1 is still sealed and has its own tail at Lsn (6)
assert_that!(
segment_1.find_tail(FindTailOptions::default()).await?,
pat!(TailState::Sealed(eq(Lsn::new(6))))
);

let segment_2 = bifrost
.inner
.find_loglet_for_lsn(LOG_ID, Lsn::new(5))
.await?
.unwrap();

assert_ne!(segment_1, segment_2);

// segment 2 is open and at 8 as previously validated through bifrost interface
assert_that!(
segment_2.find_tail(FindTailOptions::default()).await?,
pat!(TailState::Open(eq(Lsn::new(8))))
);

// Reading the log. (OLDEST)
let record = bifrost.read(LOG_ID, Lsn::OLDEST).await?.unwrap();
assert_that!(record.sequence_number(), eq(Lsn::new(1)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("segment-1-1".to_owned())
);

let record = bifrost.read(LOG_ID, Lsn::new(2)).await?.unwrap();
assert_that!(record.sequence_number(), eq(Lsn::new(2)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("segment-1-2".to_owned())
);

// border of segment 1
let record = bifrost.read(LOG_ID, Lsn::new(4)).await?.unwrap();
assert_that!(record.sequence_number(), eq(Lsn::new(4)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("segment-1-4".to_owned())
);

// start of segment 2
let record = bifrost.read(LOG_ID, Lsn::new(5)).await?.unwrap();
assert_that!(record.sequence_number(), eq(Lsn::new(5)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("segment-2-5".to_owned())
);

// last record
let record = bifrost.read(LOG_ID, Lsn::new(7)).await?.unwrap();
assert_that!(record.sequence_number(), eq(Lsn::new(7)));
assert!(record.is_data_record());
assert_that!(
record.decode_unchecked::<String>(),
eq("segment-2-7".to_owned())
);

// 8 doesn't exist yet.
assert!(bifrost.read(LOG_ID, Lsn::new(8)).await?.is_none());

Ok(())
}

#[restate_core::test(start_paused = true)]
async fn test_read_across_segments_with_seal_marker() -> googletest::Result<()> {
const LOG_ID: LogId = LogId::new(0);
// enable seal-marker feature flag
let mut config = Configuration::pinned().clone();
config.bifrost.experimental_chain_sealing = true;
Configuration::set(config);

let node_env = TestCoreEnvBuilder::with_incoming_only_connector()
.set_partition_table(PartitionTable::with_equally_sized_partitions(
Expand Down
1 change: 0 additions & 1 deletion crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ impl<'a> BifrostAdmin<'a> {

if let Some(seal_metadata) = seal_metadata
&& tail.is_sealed()
&& Configuration::pinned().bifrost.experimental_chain_sealing
{
let tail_lsn = self
.inner
Expand Down
4 changes: 4 additions & 0 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest

// All (acknowledged) appends must have offsets less than the tail observed at the first
// Sealed() response of find_tail()
println!(
"last_acked={}, first observed seal at={first_observed_seal}",
all_committed.last().unwrap()
);
assert!(first_observed_seal > *all_committed.last().unwrap());

let reader = loglet
Expand Down
18 changes: 4 additions & 14 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use restate_core::ShutdownError;
use restate_core::my_node_id;
use restate_types::Version;
use restate_types::Versioned;
use restate_types::config::Configuration;
use restate_types::live::Live;
use restate_types::logs::KeyFilter;
use restate_types::logs::MatchKeyQuery;
Expand Down Expand Up @@ -414,14 +413,7 @@ impl Stream for LogReadStream {
// reconfiguration might bring us back to Reading on the
// same substream, we don't want to lose the resources
// allocated by underlying the stream.
if Configuration::pinned()
.bifrost
.experimental_chain_sealing
{
this.state.set(State::awaiting_or_seal_chain());
} else {
this.state.set(State::AwaitingReconfiguration);
}
this.state.set(State::awaiting_or_seal_chain());
continue;
}
}
Expand Down Expand Up @@ -975,6 +967,8 @@ mod tests {
pat!(Poll::Pending)
);

// this will automatically seal the chain since it detects that the tail segment is
// actually sealed.
let tail = bifrost
.find_tail(LOG_ID, FindTailOptions::default())
.await?;
Expand All @@ -987,7 +981,7 @@ mod tests {
let old_version = metadata.logs_version();
let mut builder = metadata.logs_ref().clone().into_builder();
let mut chain_builder = builder.chain(LOG_ID).unwrap();
assert_eq!(1, chain_builder.num_segments());
assert_eq!(2, chain_builder.num_segments());
let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory);
chain_builder.append_segment(Lsn::new(11), ProviderKind::InMemory, new_segment_params)?;

Expand Down Expand Up @@ -1160,10 +1154,6 @@ mod tests {

#[restate_core::test(start_paused = true)]
async fn test_readstream_chain_sealing() -> anyhow::Result<()> {
let mut config = Configuration::pinned().clone();
config.bifrost.experimental_chain_sealing = true;
Configuration::set(config);

const LOG_ID: LogId = LogId::new(0);

let node_env = TestCoreEnvBuilder::with_incoming_only_connector()
Expand Down
7 changes: 0 additions & 7 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ pub struct BifrostOptions {
/// of replicas, or for other reasons.
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub disable_auto_improvement: bool,

// Should be enabled by default in v1.5 or v1.6 depending on whether we'll
// allow rolling back to a release prior to <v1.4.3 or not.
#[cfg_attr(feature = "schemars", schemars(skip))]
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
pub experimental_chain_sealing: bool,
}

impl BifrostOptions {
Expand Down Expand Up @@ -128,7 +122,6 @@ impl Default for BifrostOptions {
seal_retry_interval: NonZeroFriendlyDuration::from_secs_unchecked(2),
record_cache_memory_size: ByteCount::from(250u64 * 1024 * 1024), // 250 MiB
disable_auto_improvement: false,
experimental_chain_sealing: false,
}
}
}
Expand Down
Loading