diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a7e393196..f545728c5 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -835,7 +835,7 @@ mod tests { use test_log::test; use restate_bifrost::providers::memory_loglet; - use restate_bifrost::{Bifrost, BifrostService}; + use restate_bifrost::{Bifrost, BifrostService, ErrorRecoveryStrategy}; use restate_core::network::{ FailingConnector, Incoming, MessageHandler, MockPeerConnection, NetworkServerBuilder, }; @@ -875,7 +875,7 @@ mod tests { let _ = builder.build().await; bifrost_svc.start().await?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; TaskCenter::spawn(TaskKind::SystemService, "cluster-controller", svc.run())?; @@ -972,7 +972,7 @@ mod tests { let (_node_2, _node2_reactor) = node_2.process_with_message_handler(get_node_state_handler)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; for i in 1..=20 { let lsn = appender.append("").await?; assert_eq!(Lsn::from(i), lsn); @@ -1049,7 +1049,7 @@ mod tests { let (_node_2, _node2_reactor) = node_2.process_with_message_handler(get_node_state_handler)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; for i in 1..=20 { let lsn = appender.append(format!("record{i}")).await?; assert_eq!(Lsn::from(i), lsn); @@ -1112,7 +1112,7 @@ mod tests { }) .await?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; for i in 1..=5 { let lsn = appender.append(format!("record{i}")).await?; assert_eq!(Lsn::from(i), lsn); diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index e3f8449fa..4b3df3785 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -12,6 +12,8 @@ default = [] replicated-loglet = [] memory-loglet = ["restate-types/memory-loglet"] test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"] +# enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon. +auto-extend = [] [dependencies] restate-core = { workspace = true } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 65feaa37b..81d8a26ba 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -21,7 +21,7 @@ use restate_types::logs::{LogId, Lsn, Record}; use restate_types::retries::RetryIter; use restate_types::storage::StorageEncode; -use crate::bifrost::BifrostInner; +use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy}; use crate::loglet::AppendError; use crate::loglet_wrapper::LogletWrapper; use crate::{Error, InputRecord, Result}; @@ -31,17 +31,25 @@ pub struct Appender { log_id: LogId, #[debug(skip)] pub(super) config: Live, + // todo: asoli remove + #[allow(unused)] + error_recovery_strategy: ErrorRecoveryStrategy, loglet_cache: Option, #[debug(skip)] bifrost_inner: Arc, } impl Appender { - pub(crate) fn new(log_id: LogId, bifrost_inner: Arc) -> Self { + pub(crate) fn new( + log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, + bifrost_inner: Arc, + ) -> Self { let config = Configuration::updateable(); Self { log_id, config, + error_recovery_strategy, loglet_cache: Default::default(), bifrost_inner, } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 6c769aac6..06b86ff76 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -27,6 +27,40 @@ use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{Error, InputRecord, LogReadStream, Result}; +/// The strategy to use when bifrost fails to append or when it observes +/// a sealed loglet while it's tailing a log. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ErrorRecoveryStrategy { + /// Eagerly extend the chain by creating a new loglet and appending to it. + ExtendChainPreferred, + /// Extend the chain only running out of patience, others might be better suited to reconfigure + /// the chain, but when desperate, we are allowed to seal and extend. + ExtendChainAllowed, + /// Do not extend the chain, wait indefinitely instead until the error disappears. + Wait, +} + +impl ErrorRecoveryStrategy { + /// Conditional on a temporary feature gate `auto-extend` until transition is complete + pub fn extend_preferred() -> Self { + if cfg!(feature = "auto-extend") { + Self::ExtendChainPreferred + } else { + Self::Wait + } + } +} + +impl Default for ErrorRecoveryStrategy { + fn default() -> Self { + if cfg!(feature = "auto-extend") { + Self::ExtendChainAllowed + } else { + Self::Wait + } + } +} + /// Bifrost is Restate's durable interconnect system /// /// Bifrost is a mutable-friendly handle to access the system. You don't need @@ -97,10 +131,13 @@ impl Bifrost { pub async fn append( &self, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, body: impl Into>, ) -> Result { self.inner.fail_if_shutting_down()?; - self.inner.append(log_id, body).await + self.inner + .append(log_id, error_recovery_strategy, body) + .await } /// Appends a batch of records to a log. The log id must exist, otherwise the @@ -116,10 +153,13 @@ impl Bifrost { pub async fn append_batch( &self, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, batch: Vec>>, ) -> Result { self.inner.fail_if_shutting_down()?; - self.inner.append_batch(log_id, batch).await + self.inner + .append_batch(log_id, error_recovery_strategy, batch) + .await } /// Read the next record from the LSN provided. The `from` indicates the LSN where we will @@ -171,15 +211,24 @@ impl Bifrost { /// The best way to write to Bifrost is to hold on to an [`Appender`] and reuse it across /// calls, this allows internal caching of recently accessed loglets and recycling write /// buffers. - pub fn create_appender(&self, log_id: LogId) -> Result { + pub fn create_appender( + &self, + log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, + ) -> Result { self.inner.fail_if_shutting_down()?; self.inner.check_log_id(log_id)?; - Ok(Appender::new(log_id, self.inner.clone())) + Ok(Appender::new( + log_id, + error_recovery_strategy, + self.inner.clone(), + )) } pub fn create_background_appender( &self, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, queue_capacity: usize, max_batch_size: usize, ) -> Result> @@ -187,7 +236,7 @@ impl Bifrost { T: StorageEncode, { Ok(BackgroundAppender::new( - self.create_appender(log_id)?, + self.create_appender(log_id, error_recovery_strategy)?, queue_capacity, max_batch_size, )) @@ -279,17 +328,21 @@ impl BifrostInner { pub async fn append( self: &Arc, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, record: impl Into>, ) -> Result { - Appender::new(log_id, Arc::clone(self)).append(record).await + Appender::new(log_id, error_recovery_strategy, Arc::clone(self)) + .append(record) + .await } pub async fn append_batch( self: &Arc, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, batch: Vec>>, ) -> Result { - Appender::new(log_id, Arc::clone(self)) + Appender::new(log_id, error_recovery_strategy, Arc::clone(self)) .append_batch(batch) .await } @@ -523,8 +576,8 @@ mod tests { let clean_bifrost_clone = bifrost.clone(); - let mut appender_0 = bifrost.create_appender(LogId::new(0))?; - let mut appender_3 = bifrost.create_appender(LogId::new(3))?; + let mut appender_0 = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?; + let mut appender_3 = bifrost.create_appender(LogId::new(3), ErrorRecoveryStrategy::Wait)?; let mut max_lsn = Lsn::INVALID; for i in 1..=5 { // Append a record to memory @@ -536,13 +589,14 @@ mod tests { // Append to a log that doesn't exist. let invalid_log = LogId::from(num_partitions + 1); - let resp = bifrost.create_appender(invalid_log); + let resp = bifrost.create_appender(invalid_log, ErrorRecoveryStrategy::Wait); assert_that!(resp, pat!(Err(pat!(Error::UnknownLogId(eq(invalid_log)))))); // use a cloned bifrost. let cloned_bifrost = bifrost.clone(); - let mut second_appender_0 = cloned_bifrost.create_appender(LogId::new(0))?; + let mut second_appender_0 = + cloned_bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?; for _ in 1..=5 { // Append a record to memory let lsn = second_appender_0.append("").await?; @@ -553,7 +607,7 @@ mod tests { // Ensure original clone writes to the same underlying loglet. let lsn = clean_bifrost_clone - .create_appender(LogId::new(0))? + .create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)? .append("") .await?; assert_eq!(max_lsn + Lsn::from(1), lsn); @@ -591,7 +645,10 @@ mod tests { let bifrost = Bifrost::init_with_factory(factory).await; let start = tokio::time::Instant::now(); - let lsn = bifrost.create_appender(LogId::new(0))?.append("").await?; + let lsn = bifrost + .create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)? + .append("") + .await?; assert_eq!(Lsn::from(1), lsn); // The append was properly delayed assert_eq!(delay, start.elapsed()); @@ -618,7 +675,7 @@ mod tests { assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // append 10 records for _ in 1..=10 { appender.append("").await?; @@ -687,7 +744,7 @@ mod tests { &node_env.metadata_store_client, ); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] for i in 1..=5 { // Append a record to memory @@ -771,7 +828,7 @@ mod tests { ); // appends should go to the new segment - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [5..7] for i in 5..=7 { // Append a record to memory @@ -882,7 +939,7 @@ mod tests { let append_counter = append_counter.clone(); let stop_signal = stop_signal.clone(); let bifrost = bifrost.clone(); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; async move { let mut i = 0; while !stop_signal.load(Ordering::Relaxed) { diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index 7cfaf7dbb..305a07c63 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -24,7 +24,7 @@ mod watchdog; pub use appender::Appender; pub use background_appender::{AppenderHandle, BackgroundAppender, CommitToken, LogSender}; -pub use bifrost::Bifrost; +pub use bifrost::{Bifrost, ErrorRecoveryStrategy}; pub use bifrost_admin::{BifrostAdmin, SealedSegment}; pub use error::{Error, Result}; pub use read_stream::LogReadStream; diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8b95884b3..8ac81c3ee 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,7 +454,7 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService}; + use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] @@ -476,7 +476,7 @@ mod tests { svc.start().await.expect("loglet must start"); let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written @@ -558,7 +558,7 @@ mod tests { ); svc.start().await.expect("loglet must start"); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); @@ -651,7 +651,7 @@ mod tests { // create the reader and put it on the side. let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, Lsn::OLDEST, Lsn::MAX)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // We should be at tail, any attempt to read will yield `pending`. assert_that!( futures::poll!(std::pin::pin!(reader.next())), @@ -810,7 +810,7 @@ mod tests { ); svc.start().await.expect("loglet must start"); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written @@ -922,7 +922,7 @@ mod tests { .enable_in_memory_loglet(); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; let metadata = Metadata::current(); // prepare a chain that starts from Lsn 10 (we expect trim from OLDEST -> 9) diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index feec06619..d3a5418ce 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use bytes::{Bytes, BytesMut}; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, ErrorRecoveryStrategy}; use restate_core::{Metadata, ShutdownError}; use restate_storage_api::deduplication_table::DedupInformation; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; @@ -231,6 +231,10 @@ pub enum Error { /// /// Important: This method must only be called in the context of a [`TaskCenter`] task because /// it needs access to [`metadata()`]. +/// +/// todo: This method should be removed in favor of using Appender/BackgroundAppender API in +/// Bifrost. Additionally, the check for partition_table is probably unnecessary in the vast +/// majority of call-sites. pub async fn append_envelope_to_bifrost( bifrost: &Bifrost, envelope: Arc, @@ -246,7 +250,9 @@ pub async fn append_envelope_to_bifrost( let log_id = LogId::from(*partition_id); // todo: Pass the envelope as `Arc` to `append_envelope_to_bifrost` instead. Possibly use // triomphe's UniqueArc for a mutable Arc during construction. - let lsn = bifrost.append(log_id, envelope).await?; + let lsn = bifrost + .append(log_id, ErrorRecoveryStrategy::default(), envelope) + .await?; Ok((log_id, lsn)) } diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index a3611463c..5f561206d 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -10,7 +10,7 @@ use crate::partition::leadership::Error; use futures::never::Never; -use restate_bifrost::{Bifrost, CommitToken}; +use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy}; use restate_core::my_node_id; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; use restate_types::identifiers::{PartitionId, PartitionKey}; @@ -44,6 +44,7 @@ impl SelfProposer { let bifrost_appender = bifrost .create_background_appender( LogId::from(partition_id), + ErrorRecoveryStrategy::extend_preferred(), BIFROST_QUEUE_SIZE, MAX_BIFROST_APPEND_BATCH, )? diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 621aa73b6..8fa79c7f2 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -21,7 +21,7 @@ mod tests { use futures_util::StreamExt; use googletest::prelude::*; - use restate_bifrost::loglet::AppendError; + use restate_bifrost::{loglet::AppendError, ErrorRecoveryStrategy}; use restate_core::{cancellation_token, Metadata, TaskCenterFutureExt}; use test_log::test; @@ -235,6 +235,7 @@ mod tests { let offset = bifrost .append( log_id, + ErrorRecoveryStrategy::Wait, format!("appender-{appender_id}-record{i}"), ) .await?; diff --git a/tools/bifrost-benchpress/src/append_latency.rs b/tools/bifrost-benchpress/src/append_latency.rs index 37d5a82df..9c696a020 100644 --- a/tools/bifrost-benchpress/src/append_latency.rs +++ b/tools/bifrost-benchpress/src/append_latency.rs @@ -14,7 +14,7 @@ use bytes::BytesMut; use hdrhistogram::Histogram; use tracing::info; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, ErrorRecoveryStrategy}; use restate_types::logs::{LogId, WithKeys}; use crate::util::{print_latencies, DummyPayload}; @@ -41,7 +41,8 @@ pub async fn run( let blob = BytesMut::zeroed(args.payload_size).freeze(); let mut append_latencies = Histogram::::new(3)?; let mut counter = 0; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = + bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::extend_preferred())?; let start = Instant::now(); loop { if counter >= args.num_records { diff --git a/tools/bifrost-benchpress/src/write_to_read.rs b/tools/bifrost-benchpress/src/write_to_read.rs index f4f58f63a..eff7ac025 100644 --- a/tools/bifrost-benchpress/src/write_to_read.rs +++ b/tools/bifrost-benchpress/src/write_to_read.rs @@ -16,7 +16,7 @@ use futures::StreamExt; use hdrhistogram::Histogram; use tracing::info; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, ErrorRecoveryStrategy}; use restate_core::{Metadata, TaskCenter, TaskHandle, TaskKind}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, WithKeys}; @@ -96,6 +96,7 @@ pub async fn run(_common_args: &Arguments, args: &WriteToReadOpts, bifrost: Bifr let appender_handle = bifrost .create_background_appender( LOG_ID, + ErrorRecoveryStrategy::extend_preferred(), args.write_buffer_size, args.max_batch_size, )?