diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 6e3b112b23..207c311212 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -419,7 +419,6 @@ impl BifrostInner { if tail.is_sealed() && matches!(opts, FindTailOptions::ConsistentRead) - && Configuration::pinned().bifrost.experimental_chain_sealing && !logs.chain(&log_id).expect("log must exist").is_sealed() { debug!(%log_id, "Loglet {} is sealed but the chain is not. Sealing the chain", loglet.debug_str()); @@ -965,200 +964,6 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_read_across_segments() -> googletest::Result<()> { const LOG_ID: LogId = LogId::new(0); - let node_env = TestCoreEnvBuilder::with_incoming_only_connector() - .set_partition_table(PartitionTable::with_equally_sized_partitions( - Version::MIN, - 1, - )) - .build() - .await; - // disable seal-marker feature flag - let mut config = Configuration::pinned().clone(); - config.bifrost.experimental_chain_sealing = false; - Configuration::set(config); - - let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; - - let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; - // Lsns [1..5] - for i in 1..=5 { - // Append a record to memory - let lsn = appender.append(format!("segment-1-{i}")).await?; - assert_eq!(Lsn::from(i), lsn); - } - - // not sealed, tail is what we expect - assert_that!( - bifrost - .find_tail(LOG_ID, FindTailOptions::default()) - .await?, - pat!(TailState::Open(eq(Lsn::new(6)))) - ); - - let segment_1 = bifrost - .inner - .find_loglet_for_lsn(LOG_ID, Lsn::OLDEST) - .await? - .unwrap(); - - // seal the segment - bifrost - .admin() - .seal(LOG_ID, segment_1.segment_index(), SealMetadata::default()) - .await?; - - // sealed, tail is what we expect - assert_that!( - bifrost - .find_tail(LOG_ID, FindTailOptions::default()) - .await?, - pat!(TailState::Sealed(eq(Lsn::new(6)))) - ); - - println!("attempting to read during reconfiguration"); - // attempting to read from bifrost will result in a timeout since metadata sees this as an open - // segment but the segment itself is sealed. This means reconfiguration is in-progress - // and we can't confidently read records. - assert!( - tokio::time::timeout(Duration::from_secs(5), bifrost.read(LOG_ID, Lsn::new(2))) - .await - .is_err() - ); - - let metadata = Metadata::current(); - let old_version = metadata.logs_version(); - - let mut builder = metadata.logs_ref().clone().into_builder(); - let mut chain_builder = builder.chain(LOG_ID).unwrap(); - assert_eq!(1, chain_builder.num_segments()); - let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - // deliberately skips Lsn::from(6) to create a zombie record in segment 1. Segment 1 now has 4 records. - chain_builder.append_segment(Lsn::new(5), ProviderKind::InMemory, new_segment_params)?; - - let new_metadata = builder.build(); - let new_version = new_metadata.version(); - assert_eq!(new_version, old_version.next()); - node_env - .metadata_writer - .global_metadata() - .put( - new_metadata.into(), - Precondition::MatchesVersion(old_version), - ) - .await?; - - assert_eq!(new_version, metadata.logs_version()); - - { - // validate that the stored metadata matches our expectations. - let new_metadata = metadata.logs_ref().clone(); - let chain_builder = new_metadata.chain(&LOG_ID).unwrap(); - assert_eq!(2, chain_builder.num_segments()); - } - - // find_tail() on the underlying loglet returns (6) but for bifrost it should be (5) after - // the new segment was created at tail of the chain with base_lsn=5 - assert_that!( - bifrost - .find_tail(LOG_ID, FindTailOptions::default()) - .await?, - pat!(TailState::Open(eq(Lsn::new(5)))) - ); - - // appends should go to the new segment - let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; - // Lsns [5..7] - for i in 5..=7 { - // Append a record to memory - let lsn = appender.append(format!("segment-2-{i}")).await?; - assert_eq!(Lsn::from(i), lsn); - } - - // tail is now 8 and open. - assert_that!( - bifrost - .find_tail(LOG_ID, FindTailOptions::default()) - .await?, - pat!(TailState::Open(eq(Lsn::new(8)))) - ); - - // validating that segment 1 is still sealed and has its own tail at Lsn (6) - assert_that!( - segment_1.find_tail(FindTailOptions::default()).await?, - pat!(TailState::Sealed(eq(Lsn::new(6)))) - ); - - let segment_2 = bifrost - .inner - .find_loglet_for_lsn(LOG_ID, Lsn::new(5)) - .await? - .unwrap(); - - assert_ne!(segment_1, segment_2); - - // segment 2 is open and at 8 as previously validated through bifrost interface - assert_that!( - segment_2.find_tail(FindTailOptions::default()).await?, - pat!(TailState::Open(eq(Lsn::new(8)))) - ); - - // Reading the log. (OLDEST) - let record = bifrost.read(LOG_ID, Lsn::OLDEST).await?.unwrap(); - assert_that!(record.sequence_number(), eq(Lsn::new(1))); - assert!(record.is_data_record()); - assert_that!( - record.decode_unchecked::(), - eq("segment-1-1".to_owned()) - ); - - let record = bifrost.read(LOG_ID, Lsn::new(2)).await?.unwrap(); - assert_that!(record.sequence_number(), eq(Lsn::new(2))); - assert!(record.is_data_record()); - assert_that!( - record.decode_unchecked::(), - eq("segment-1-2".to_owned()) - ); - - // border of segment 1 - let record = bifrost.read(LOG_ID, Lsn::new(4)).await?.unwrap(); - assert_that!(record.sequence_number(), eq(Lsn::new(4))); - assert!(record.is_data_record()); - assert_that!( - record.decode_unchecked::(), - eq("segment-1-4".to_owned()) - ); - - // start of segment 2 - let record = bifrost.read(LOG_ID, Lsn::new(5)).await?.unwrap(); - assert_that!(record.sequence_number(), eq(Lsn::new(5))); - assert!(record.is_data_record()); - assert_that!( - record.decode_unchecked::(), - eq("segment-2-5".to_owned()) - ); - - // last record - let record = bifrost.read(LOG_ID, Lsn::new(7)).await?.unwrap(); - assert_that!(record.sequence_number(), eq(Lsn::new(7))); - assert!(record.is_data_record()); - assert_that!( - record.decode_unchecked::(), - eq("segment-2-7".to_owned()) - ); - - // 8 doesn't exist yet. - assert!(bifrost.read(LOG_ID, Lsn::new(8)).await?.is_none()); - - Ok(()) - } - - #[restate_core::test(start_paused = true)] - async fn test_read_across_segments_with_seal_marker() -> googletest::Result<()> { - const LOG_ID: LogId = LogId::new(0); - // enable seal-marker feature flag - let mut config = Configuration::pinned().clone(); - config.bifrost.experimental_chain_sealing = true; - Configuration::set(config); let node_env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 0b7487bb19..c27389852e 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -227,7 +227,6 @@ impl<'a> BifrostAdmin<'a> { if let Some(seal_metadata) = seal_metadata && tail.is_sealed() - && Configuration::pinned().bifrost.experimental_chain_sealing { let tail_lsn = self .inner diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 30090da3c0..c046356463 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -595,6 +595,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc) -> googletest // All (acknowledged) appends must have offsets less than the tail observed at the first // Sealed() response of find_tail() + println!( + "last_acked={}, first observed seal at={first_observed_seal}", + all_committed.last().unwrap() + ); assert!(first_observed_seal > *all_committed.last().unwrap()); let reader = loglet diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 835a1f6938..8e52de985d 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -28,7 +28,6 @@ use restate_core::ShutdownError; use restate_core::my_node_id; use restate_types::Version; use restate_types::Versioned; -use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::KeyFilter; use restate_types::logs::MatchKeyQuery; @@ -414,14 +413,7 @@ impl Stream for LogReadStream { // reconfiguration might bring us back to Reading on the // same substream, we don't want to lose the resources // allocated by underlying the stream. - if Configuration::pinned() - .bifrost - .experimental_chain_sealing - { - this.state.set(State::awaiting_or_seal_chain()); - } else { - this.state.set(State::AwaitingReconfiguration); - } + this.state.set(State::awaiting_or_seal_chain()); continue; } } @@ -975,6 +967,8 @@ mod tests { pat!(Poll::Pending) ); + // this will automatically seal the chain since it detects that the tail segment is + // actually sealed. let tail = bifrost .find_tail(LOG_ID, FindTailOptions::default()) .await?; @@ -987,7 +981,7 @@ mod tests { let old_version = metadata.logs_version(); let mut builder = metadata.logs_ref().clone().into_builder(); let mut chain_builder = builder.chain(LOG_ID).unwrap(); - assert_eq!(1, chain_builder.num_segments()); + assert_eq!(2, chain_builder.num_segments()); let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); chain_builder.append_segment(Lsn::new(11), ProviderKind::InMemory, new_segment_params)?; @@ -1160,10 +1154,6 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_readstream_chain_sealing() -> anyhow::Result<()> { - let mut config = Configuration::pinned().clone(); - config.bifrost.experimental_chain_sealing = true; - Configuration::set(config); - const LOG_ID: LogId = LogId::new(0); let node_env = TestCoreEnvBuilder::with_incoming_only_connector() diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 0a44db0530..9c300b4530 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -86,12 +86,6 @@ pub struct BifrostOptions { /// of replicas, or for other reasons. #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub disable_auto_improvement: bool, - - // Should be enabled by default in v1.5 or v1.6 depending on whether we'll - // allow rolling back to a release prior to