diff --git a/configs/server.json b/configs/server.json index a7bade19f..46b389991 100644 --- a/configs/server.json +++ b/configs/server.json @@ -143,7 +143,11 @@ } }, "state": { - "enforce_fsync": false + "enforce_fsync": false, + "idle_timeout": "5 m", + "server_confirmation": "wait", + "max_file_operation_retries": 1, + "retry_delay": "1 s" }, "runtime": { "path": "runtime" diff --git a/configs/server.toml b/configs/server.toml index 5fe2120da..75f41f64e 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -320,6 +320,26 @@ path = "compatibility" # `false` allows the OS to manage write operations, which can improve performance. enforce_fsync = false +# Defines the file system confirmation behavior during state updates. +# Controls how the system waits for file write operations to complete. +# Possible values: +# - "wait": waits for the file operation to complete before proceeding. +# - "wait_with_flush": waits for the file operation to complete and ensures data is flushed to disk. +# - "nowait": proceeds without waiting for the file operation to finish, potentially increasing performance but at the cost of durability. +server_confirmation = "wait" + +# Timeout for idle state, after which the task will be considered inactive. +# The task will be notified if it remains idle for this duration. +idle_timeout = "5 m" + +# Maximum number of retries for a failed file operation (e.g., append, overwrite). +# This defines how many times the system will attempt the operation before failing. +max_file_operation_retries = 1 + +# Delay between retries in case of a failed file operation. +# This helps to avoid immediate repeated attempts and can reduce load. +retry_delay = "1 s" + # Runtime configuration. [system.runtime] # Path for storing runtime data. @@ -420,7 +440,7 @@ validate_checksum = false # The threshold of buffered messages before triggering a save to disk (integer). # Specifies how many messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. -messages_required_to_save = 5000 +messages_required_to_save = 500 # Segment configuration [system.segment] diff --git a/integration/tests/state/mod.rs b/integration/tests/state/mod.rs index 588163120..149f780c3 100644 --- a/integration/tests/state/mod.rs +++ b/integration/tests/state/mod.rs @@ -1,4 +1,5 @@ use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; +use server::configs::system::SystemConfig; use server::state::file::FileState; use server::streaming::persistence::persister::FilePersister; use server::versioning::SemanticVersion; @@ -31,7 +32,9 @@ impl StateSetup { create_dir(&directory_path).await.unwrap(); let version = SemanticVersion::from_str("1.2.3").unwrap(); - let persister = FilePersister {}; + + let config = SystemConfig::default(); + let persister = FilePersister::new(Arc::new(config)).await; let encryptor: Option> = match encryption_key { Some(key) => Some(Arc::new(Aes256GcmEncryptor::new(key).unwrap())), None => None, diff --git a/integration/tests/streaming/common/test_setup.rs b/integration/tests/streaming/common/test_setup.rs index 7585ee60c..2e3eef547 100644 --- a/integration/tests/streaming/common/test_setup.rs +++ b/integration/tests/streaming/common/test_setup.rs @@ -20,7 +20,7 @@ impl TestSetup { let config = Arc::new(config); fs::create_dir(config.get_system_path()).await.unwrap(); - let persister = FilePersister {}; + let persister = FilePersister::new(config.clone()).await; let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister))); TestSetup { config, storage } } diff --git a/integration/tests/streaming/messages.rs b/integration/tests/streaming/messages.rs index eb51f7676..27b5d6470 100644 --- a/integration/tests/streaming/messages.rs +++ b/integration/tests/streaming/messages.rs @@ -114,12 +114,12 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() { partition.partition_id, ); partition - .append_messages(appendable_batch_info, messages) + .append_messages(appendable_batch_info, messages, None) .await .unwrap(); let test_timestamp = IggyTimestamp::now(); partition - .append_messages(appendable_batch_info_two, messages_two) + .append_messages(appendable_batch_info_two, messages_two, None) .await .unwrap(); @@ -217,7 +217,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() { partition.partition_id, ); partition - .append_messages(appendable_batch_info, messages) + .append_messages(appendable_batch_info, messages, None) .await .unwrap(); assert_eq!(partition.unsaved_messages_count, 0); diff --git a/integration/tests/streaming/mod.rs b/integration/tests/streaming/mod.rs index f52795c68..f5490975e 100644 --- a/integration/tests/streaming/mod.rs +++ b/integration/tests/streaming/mod.rs @@ -5,6 +5,7 @@ mod common; mod consumer_offset; mod messages; mod partition; +mod persistence; mod segment; mod snapshot; mod stream; diff --git a/integration/tests/streaming/partition.rs b/integration/tests/streaming/partition.rs index 59dbe6e49..f89187485 100644 --- a/integration/tests/streaming/partition.rs +++ b/integration/tests/streaming/partition.rs @@ -180,7 +180,7 @@ async fn should_purge_existing_partition_on_disk() { partition.partition_id, ); partition - .append_messages(appendable_batch_info, messages) + .append_messages(appendable_batch_info, messages, None) .await .unwrap(); let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap(); diff --git a/integration/tests/streaming/persistence.rs b/integration/tests/streaming/persistence.rs new file mode 100644 index 000000000..33101dc88 --- /dev/null +++ b/integration/tests/streaming/persistence.rs @@ -0,0 +1,106 @@ +use std::{sync::Arc, time::Duration}; + +use server::streaming::persistence::persister::{FilePersister, Persister}; + +use bytes::Bytes; +use iggy::{confirmation::Confirmation, utils::duration::IggyDuration}; +use server::configs::system::SystemConfig; +use tempfile::NamedTempFile; +use tokio::{io::AsyncReadExt, time::sleep}; + +#[tokio::test] +async fn test_append_nowait() { + let config = SystemConfig::default(); + + let temp_out_file = NamedTempFile::new().unwrap(); + let file_path = temp_out_file.path().to_path_buf(); + + let bytes = b"test data"; + + let persister = FilePersister::new(Arc::new(config)).await; + let err = persister + .append( + file_path.to_str().unwrap(), + Bytes::copy_from_slice(bytes), + Some(Confirmation::Nowait), + ) + .await; + assert!(err.is_ok()); + + sleep(Duration::from_millis(100)).await; + + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).await.unwrap(); + + assert_eq!(buffer, bytes); +} + +#[tokio::test] +async fn test_task_removal_on_idle_timeout_and_persistence_of_active_task() { + let mut config = SystemConfig::default(); + config.state.idle_timeout = IggyDuration::new(Duration::from_millis(100)); + + let temp_out_file_1 = NamedTempFile::new().unwrap(); + let file_path_1 = temp_out_file_1.path().to_path_buf(); + let file_path_str_1 = file_path_1.to_str().unwrap(); + + let temp_out_file_2 = NamedTempFile::new().unwrap(); + let file_path_2 = temp_out_file_2.path().to_path_buf(); + let file_path_str_2 = file_path_2.to_str().unwrap(); + + let persister = FilePersister::new(Arc::new(config)).await; + + assert!( + !persister.is_task_active(file_path_str_1), + "Task 1 should not be active initially" + ); + assert!( + !persister.is_task_active(file_path_str_2), + "Task 2 should not be active initially" + ); + + // Activate the first task by issuing an append command + let err = persister + .append(file_path_str_1, Bytes::new(), Some(Confirmation::Nowait)) + .await; + assert!(err.is_ok()); + assert!( + persister.is_task_active(file_path_str_1), + "Task 1 should be active after appending" + ); + + // Wait 50 ms, then activate the second task to refresh its timeout + sleep(Duration::from_millis(50)).await; + let err = persister + .append(file_path_str_2, Bytes::new(), Some(Confirmation::Nowait)) + .await; + assert!(err.is_ok()); + assert!( + persister.is_task_active(file_path_str_2), + "Task 2 should be active after appending" + ); + + // Wait another 70 ms, so the total time for the first task (120 ms) exceeds idle_timeout, + // but the second task remains active since its timeout was refreshed + sleep(Duration::from_millis(70)).await; + + // Ensure the second task is still active after its recent append + assert!( + persister.is_task_active(file_path_str_2), + "Task 2 should still be active after recent append" + ); + + // Confirm that the first task has been terminated due to idle timeout expiration + assert!( + !persister.is_task_active(file_path_str_1), + "Task 1 should no longer be active after timeout" + ); + + // Wait another 150 ms to confirm that the second task also terminates after its own idle timeout + sleep(Duration::from_millis(150)).await; + assert!( + !persister.is_task_active(file_path_str_2), + "Task 2 should no longer be active after idle timeout" + ); +} diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 70faa72e8..cdaffb9fb 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -172,7 +172,7 @@ async fn should_persist_and_load_segment_with_messages() { .append_batch(batch_size, messages_count as u32, &messages) .await .unwrap(); - segment.persist_messages().await.unwrap(); + segment.persist_messages(None).await.unwrap(); let mut loaded_segment = segment::Segment::create( stream_id, topic_id, @@ -257,7 +257,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { .append_batch(batch_size, messages_count as u32, &messages) .await .unwrap(); - segment.persist_messages().await.unwrap(); + segment.persist_messages(None).await.unwrap(); segment.is_closed = true; let is_expired = segment.is_expired(now).await; @@ -342,7 +342,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() .append_batch(not_expired_message_size, 1, ¬_expired_messages) .await .unwrap(); - segment.persist_messages().await.unwrap(); + segment.persist_messages(None).await.unwrap(); let is_expired = segment.is_expired(now).await; assert!(!is_expired); diff --git a/integration/tests/streaming/snapshot.rs b/integration/tests/streaming/snapshot.rs index 6e86abdc9..10d2821d1 100644 --- a/integration/tests/streaming/snapshot.rs +++ b/integration/tests/streaming/snapshot.rs @@ -14,7 +14,8 @@ async fn should_create_snapshot_file() { setup.config.clone(), DataMaintenanceConfig::default(), PersonalAccessTokenConfig::default(), - ); + ) + .await; system.init().await.unwrap(); diff --git a/integration/tests/streaming/stream.rs b/integration/tests/streaming/stream.rs index 8e2742991..d47952591 100644 --- a/integration/tests/streaming/stream.rs +++ b/integration/tests/streaming/stream.rs @@ -129,7 +129,7 @@ async fn should_purge_existing_stream_on_disk() { .unwrap(); let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); topic - .append_messages(batch_size, Partitioning::partition_id(1), messages) + .append_messages(batch_size, Partitioning::partition_id(1), messages, None) .await .unwrap(); let loaded_messages = topic diff --git a/integration/tests/streaming/system.rs b/integration/tests/streaming/system.rs index c9c53cf5d..2074ca15f 100644 --- a/integration/tests/streaming/system.rs +++ b/integration/tests/streaming/system.rs @@ -13,7 +13,8 @@ async fn should_initialize_system_and_base_directories() { setup.config.clone(), DataMaintenanceConfig::default(), PersonalAccessTokenConfig::default(), - ); + ) + .await; system.init().await.unwrap(); @@ -36,7 +37,8 @@ async fn should_create_and_persist_stream() { setup.config.clone(), DataMaintenanceConfig::default(), PersonalAccessTokenConfig::default(), - ); + ) + .await; let stream_id = 1; let stream_name = "test"; let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); @@ -57,7 +59,8 @@ async fn should_create_and_persist_stream_with_automatically_generated_id() { setup.config.clone(), DataMaintenanceConfig::default(), PersonalAccessTokenConfig::default(), - ); + ) + .await; let stream_id = 1; let stream_name = "test"; let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); @@ -78,7 +81,8 @@ async fn should_delete_persisted_stream() { setup.config.clone(), DataMaintenanceConfig::default(), PersonalAccessTokenConfig::default(), - ); + ) + .await; let stream_id = 1; let stream_name = "test"; let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); diff --git a/integration/tests/streaming/topic.rs b/integration/tests/streaming/topic.rs index 2d24a7636..b36949d80 100644 --- a/integration/tests/streaming/topic.rs +++ b/integration/tests/streaming/topic.rs @@ -205,7 +205,7 @@ async fn should_purge_existing_topic_on_disk() { let messages_count = messages.len(); let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); topic - .append_messages(batch_size, Partitioning::partition_id(1), messages) + .append_messages(batch_size, Partitioning::partition_id(1), messages, None) .await .unwrap(); let loaded_messages = topic diff --git a/integration/tests/streaming/topic_messages.rs b/integration/tests/streaming/topic_messages.rs index 678626c1d..4219242fd 100644 --- a/integration/tests/streaming/topic_messages.rs +++ b/integration/tests/streaming/topic_messages.rs @@ -80,7 +80,7 @@ async fn assert_polling_messages(cache: CacheConfig, expect_enabled_cache: bool) } let batch_size = messages.iter().map(|m| m.get_size_bytes() as u64).sum(); topic - .append_messages(batch_size, partitioning, messages) + .append_messages(batch_size, partitioning, messages, None) .await .unwrap(); @@ -125,6 +125,7 @@ async fn given_key_none_messages_should_be_appended_to_the_next_partition_using_ batch_size, partitioning.clone(), vec![get_message(i as u128, &payload)], + None, ) .await .unwrap(); @@ -150,6 +151,7 @@ async fn given_key_partition_id_messages_should_be_appended_to_the_chosen_partit batch_size, partitioning.clone(), vec![get_message(i as u128, &payload)], + None, ) .await .unwrap(); @@ -179,6 +181,7 @@ async fn given_key_messages_key_messages_should_be_appended_to_the_calculated_pa batch_size, partitioning, vec![get_message(entity_id as u128, &payload)], + None, ) .await .unwrap(); diff --git a/sdk/src/confirmation.rs b/sdk/src/confirmation.rs new file mode 100644 index 000000000..aa299b609 --- /dev/null +++ b/sdk/src/confirmation.rs @@ -0,0 +1,35 @@ +use std::{fmt, str::FromStr}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Default, Deserialize, Serialize, Debug)] +pub enum Confirmation { + #[default] + WaitWithFlush, + Wait, + Nowait, +} + +impl FromStr for Confirmation { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "wait_with_flush" => Ok(Confirmation::WaitWithFlush), + "wait" => Ok(Confirmation::Wait), + "nowait" => Ok(Confirmation::Nowait), + _ => Err(format!("Invalid confirmation type: {}", s)), + } + } +} + +impl fmt::Display for Confirmation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Confirmation::WaitWithFlush => "wait_with_flush", + Confirmation::Wait => "wait", + Confirmation::Nowait => "nowait", + }; + write!(f, "{}", s) + } +} diff --git a/sdk/src/error.rs b/sdk/src/error.rs index d7eae52ea..516c7eeac 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -380,6 +380,8 @@ pub enum IggyError { InvalidConnectionString = 8000, #[error("Snaphot file completion failed")] SnapshotFileCompletionFailed = 9000, + #[error("Failed to queue command")] + CommandQueueError(#[source] anyhow::Error) = 10000, } impl IggyError { diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index b7b59b09e..00cd23d9d 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -13,6 +13,7 @@ pub mod client_provider; pub mod clients; pub mod command; pub mod compression; +pub mod confirmation; pub mod consumer; pub mod consumer_groups; pub mod consumer_offsets; diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs b/server/src/binary/handlers/messages/send_messages_handler.rs index 5eb60852a..9a99e68c9 100644 --- a/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/server/src/binary/handlers/messages/send_messages_handler.rs @@ -18,8 +18,9 @@ pub async fn handle( let topic_id = command.topic_id; let partitioning = command.partitioning; let messages = command.messages; + // TODO add confirmation system - .append_messages(session, stream_id, topic_id, partitioning, messages) + .append_messages(session, stream_id, topic_id, partitioning, messages, None) .await?; sender.send_empty_ok_response().await?; Ok(()) diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 2a4e8d9dc..bc0f375a0 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -17,6 +17,8 @@ use crate::streaming::streams::stream::Stream; use crate::streaming::systems::info::SystemInfo; use crate::streaming::topics::topic::Topic; use async_trait::async_trait; +use bytes::Bytes; +use iggy::confirmation::Confirmation; use iggy::consumer::ConsumerKind; use iggy::error::IggyError; use std::path::Path; @@ -107,7 +109,12 @@ struct NoopSegmentStorage {} #[async_trait] impl Persister for NoopPersister { - async fn append(&self, _path: &str, _bytes: &[u8]) -> Result<(), IggyError> { + async fn append( + &self, + _path: &str, + _bytes: Bytes, + _confirmation: Option, + ) -> Result<(), IggyError> { Ok(()) } @@ -234,6 +241,7 @@ impl SegmentStorage for NoopSegmentStorage { &self, _segment: &Segment, _batch: RetainedMessageBatch, + _confirmation: Option, ) -> Result { Ok(0) } @@ -259,7 +267,12 @@ impl SegmentStorage for NoopSegmentStorage { Ok(None) } - async fn save_index(&self, _index_path: &str, _index: Index) -> Result<(), IggyError> { + async fn save_index( + &self, + _index_path: &str, + _index: Index, + _confirmation: Option, + ) -> Result<(), IggyError> { Ok(()) } diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 43728435b..37fe4775a 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -424,6 +424,16 @@ impl Default for StateConfig { fn default() -> StateConfig { StateConfig { enforce_fsync: SERVER_CONFIG.system.state.enforce_fsync, + server_confirmation: SERVER_CONFIG + .system + .state + .server_confirmation + .parse() + .unwrap(), + idle_timeout: SERVER_CONFIG.system.state.idle_timeout.parse().unwrap(), + max_file_operation_retries: SERVER_CONFIG.system.state.max_file_operation_retries + as u32, + retry_delay: SERVER_CONFIG.system.state.retry_delay.parse().unwrap(), } } } diff --git a/server/src/configs/displays.rs b/server/src/configs/displays.rs index bc4094705..811e1bb13 100644 --- a/server/src/configs/displays.rs +++ b/server/src/configs/displays.rs @@ -17,6 +17,8 @@ use crate::configs::{ }; use std::fmt::{Display, Formatter}; +use super::system::StateConfig; + impl Display for HttpConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( @@ -336,11 +338,25 @@ impl Display for TelemetryTracesConfig { } } +impl Display for StateConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ enforce_fsync: {}, server_confirmation: {}, idle_timeout: {}, max_file_operation_retries: {}, retry_delay: {} }}", + self.enforce_fsync, + self.server_confirmation, + self.idle_timeout, + self.max_file_operation_retries, + self.retry_delay, + ) + } +} + impl Display for SystemConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ path: {}, logging: {}, cache: {}, stream: {}, topic: {}, partition: {}, segment: {}, encryption: {} }}", + "{{ path: {}, logging: {}, cache: {}, stream: {}, topic: {}, partition: {}, segment: {}, encryption: {}, state: {} }}", self.path, self.logging, self.cache, @@ -348,7 +364,8 @@ impl Display for SystemConfig { self.topic, self.partition, self.segment, - self.encryption + self.encryption, + self.state, ) } } diff --git a/server/src/configs/system.rs b/server/src/configs/system.rs index 3ebcf3f3c..ef23c0689 100644 --- a/server/src/configs/system.rs +++ b/server/src/configs/system.rs @@ -1,4 +1,5 @@ use crate::configs::resource_quota::MemoryResourceQuota; +use iggy::confirmation::Confirmation; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; use iggy::utils::topic_size::MaxTopicSize; @@ -125,9 +126,17 @@ pub struct SegmentConfig { pub archive_expired: bool, } +#[serde_as] #[derive(Debug, Deserialize, Serialize)] pub struct StateConfig { pub enforce_fsync: bool, + #[serde_as(as = "DisplayFromStr")] + pub server_confirmation: Confirmation, + #[serde_as(as = "DisplayFromStr")] + pub idle_timeout: IggyDuration, + pub max_file_operation_retries: u32, + #[serde_as(as = "DisplayFromStr")] + pub retry_delay: IggyDuration, } impl SystemConfig { diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 6c0769531..bd495fba5 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -76,6 +76,7 @@ async fn send_messages( let topic_id = command.topic_id; let partitioning = command.partitioning; let system = state.system.read().await; + // TODO add confirmation system .append_messages( &Session::stateless(identity.user_id, identity.ip_address), @@ -83,6 +84,7 @@ async fn send_messages( topic_id, partitioning, messages, + None, ) .await?; Ok(StatusCode::CREATED) diff --git a/server/src/main.rs b/server/src/main.rs index 54496e2b9..58bdf1994 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -42,11 +42,14 @@ async fn main() -> Result<(), ServerError> { logging.late_init(config.system.get_system_path(), &config.system.logging)?; - let system = SharedSystem::new(System::new( - config.system.clone(), - config.data_maintenance.clone(), - config.personal_access_token.clone(), - )); + let system = SharedSystem::new( + System::new( + config.system.clone(), + config.data_maintenance.clone(), + config.personal_access_token.clone(), + ) + .await, + ); // Workaround to ensure that the statistics are initialized before the server // loads streams and starts accepting connections. This is necessary to diff --git a/server/src/state/file.rs b/server/src/state/file.rs index 2f99b72be..1d2b17343 100644 --- a/server/src/state/file.rs +++ b/server/src/state/file.rs @@ -252,7 +252,7 @@ impl State for FileState { ); let bytes = entry.to_bytes(); self.entries_count.fetch_add(1, Ordering::SeqCst); - self.persister.append(&self.path, &bytes).await?; + self.persister.append(&self.path, bytes, None).await?; debug!("Applied state entry: {entry}"); Ok(()) } diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 42c5e011d..3d3282fd4 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -4,6 +4,7 @@ use crate::streaming::models::messages::RetainedMessage; use crate::streaming::partitions::partition::Partition; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::segments::segment::Segment; +use iggy::confirmation::Confirmation; use iggy::messages::send_messages::Message; use iggy::models::messages::POLLED_MESSAGE_METADATA; use iggy::utils::timestamp::IggyTimestamp; @@ -378,6 +379,7 @@ impl Partition { &mut self, appendable_batch_info: AppendableBatchInfo, messages: Vec, + confirmation: Option, ) -> Result<(), IggyError> { { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; @@ -482,7 +484,7 @@ impl Partition { self.partition_id ); - last_segment.persist_messages().await.unwrap(); + last_segment.persist_messages(confirmation).await.unwrap(); self.unsaved_messages_count = 0; } } @@ -506,7 +508,7 @@ impl Partition { // Make sure all of the messages from the accumulator are persisted // no leftover from one round trip. while last_segment.unsaved_messages.is_some() { - last_segment.persist_messages().await.unwrap(); + last_segment.persist_messages(None).await.unwrap(); } self.unsaved_messages_count = 0; Ok(()) @@ -552,7 +554,7 @@ mod tests { partition_id: partition.partition_id, }; partition - .append_messages(appendable_batch_info, messages) + .append_messages(appendable_batch_info, messages, None) .await .unwrap(); @@ -574,7 +576,7 @@ mod tests { partition_id: partition.partition_id, }; partition - .append_messages(appendable_batch_info, messages) + .append_messages(appendable_batch_info, messages, None) .await .unwrap(); diff --git a/server/src/streaming/persistence/persister.rs b/server/src/streaming/persistence/persister.rs index ced14fc6b..b64596b7a 100644 --- a/server/src/streaming/persistence/persister.rs +++ b/server/src/streaming/persistence/persister.rs @@ -1,13 +1,28 @@ +use crate::configs::system::SystemConfig; use crate::streaming::utils::file; +use anyhow::Context; use async_trait::async_trait; +use bytes::Bytes; +use dashmap::DashMap; +use flume::{unbounded, Receiver, Sender}; +use iggy::confirmation::Confirmation; use iggy::error::IggyError; use std::fmt::Debug; -use tokio::fs; +use std::sync::Arc; use tokio::io::AsyncWriteExt; +use tokio::time::{Duration, Instant}; +use tokio::{fs, sync::oneshot}; +use tokio::{task, time}; +use tracing::error; #[async_trait] pub trait Persister: Sync + Send { - async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError>; + async fn append( + &self, + path: &str, + bytes: Bytes, + confirmation: Option, + ) -> Result<(), IggyError>; async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError>; async fn delete(&self, path: &str) -> Result<(), IggyError>; } @@ -21,50 +36,194 @@ impl Debug for dyn Persister { } #[derive(Debug)] -pub struct FilePersister; +enum FilePersisterCommand { + Append(Bytes), +} #[derive(Debug)] -pub struct FileWithSyncPersister; +struct FilePersisterTask { + sender: Sender, +} + +impl FilePersisterTask { + async fn new( + path: String, + idle_timeout: Duration, + idle_notifier: Sender, + max_retries: u32, + retry_sleep: Duration, + ) -> Self { + let (sender, receiver): (Sender, Receiver) = + unbounded(); + let (completion_sender, completion_receiver): ( + oneshot::Sender, + oneshot::Receiver, + ) = oneshot::channel(); + + task::spawn(async move { + let mut idle_interval = time::interval(idle_timeout); + let mut last_used = Instant::now(); + + completion_sender.send(1).unwrap(); + + loop { + tokio::select! { + command = receiver.recv_async() => { + last_used = Instant::now(); + let file_operation = match command { + Ok(file_operation) => file_operation, + Err(e) => { + error!("Error receiving command: {}", e); + continue + } + }; + + let mut retries = 0; + while retries < max_retries { + match FilePersisterTask::handle_file_operation(&file_operation, &path).await { + Ok(_) => break, + Err(e) => { + error!("File operation failed: {:?}", e); + retries += 1; + tokio::time::sleep(retry_sleep).await; + } + }; + } + }, + _ = idle_interval.tick() => { + if last_used.elapsed() > idle_timeout { + if let Err(e) = idle_notifier.send_async(path.to_string()).await { + error!("Error sending idle notification: {:?}", e); + }; + break + } + } + } + } + }); + completion_receiver.await.unwrap(); + + FilePersisterTask { sender } + } + + async fn handle_file_operation( + file_operation: &FilePersisterCommand, + path: &str, + ) -> Result<(), std::io::Error> { + match file_operation { + FilePersisterCommand::Append(bytes) => { + let mut file = file::append(path).await?; + file.write_all(bytes).await?; + } + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct FilePersister { + path_to_task_map: Arc>>, + idle_notifier: Sender, + config: Arc, +} unsafe impl Send for FilePersister {} unsafe impl Sync for FilePersister {} -unsafe impl Send for FileWithSyncPersister {} -unsafe impl Sync for FileWithSyncPersister {} +impl FilePersister { + pub async fn new(config: Arc) -> Self { + let (sender, receiver): (Sender, Receiver) = unbounded(); + let path_to_task_map = Arc::new(DashMap::new()); -#[async_trait] -impl Persister for FilePersister { - async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { - let mut file = file::append(path).await?; - file.write_all(bytes).await?; - Ok(()) + let persister = FilePersister { + path_to_task_map: path_to_task_map.clone(), + idle_notifier: sender, + config, + }; + + let map_clone = Arc::clone(&path_to_task_map); + + task::spawn(async move { + while let Ok(idle_path) = receiver.recv_async().await { + map_clone.remove(&idle_path); + } + }); + persister } +} - async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { - let mut file = file::overwrite(path).await?; - file.write_all(bytes).await?; - Ok(()) +impl FilePersister { + async fn ensure_task(&self, path: &str) -> Arc { + if let Some(task) = self.path_to_task_map.get(path) { + task.clone() + } else { + let new_task = Arc::new( + FilePersisterTask::new( + path.to_string(), + self.config.state.idle_timeout.get_duration(), + self.idle_notifier.clone(), + self.config.state.max_file_operation_retries, + self.config.state.retry_delay.get_duration(), + ) + .await, + ); + let task_entry = self + .path_to_task_map + .entry(path.to_string()) + .or_insert(new_task); + task_entry.clone() + } } - async fn delete(&self, path: &str) -> Result<(), IggyError> { - fs::remove_file(path).await?; - Ok(()) + pub fn is_task_active(&self, path: &str) -> bool { + self.path_to_task_map.contains_key(path) } } #[async_trait] -impl Persister for FileWithSyncPersister { - async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { - let mut file = file::append(path).await?; - file.write_all(bytes).await?; - file.sync_all().await?; +impl Persister for FilePersister { + async fn append( + &self, + path: &str, + bytes: Bytes, + confirmation: Option, + ) -> Result<(), IggyError> { + let confirmation = confirmation.unwrap_or(self.config.state.server_confirmation.clone()); + + match confirmation { + Confirmation::WaitWithFlush => { + let mut file = file::append(path).await?; + file.write_all(&bytes).await?; + file.sync_all().await?; + } + Confirmation::Wait => { + let mut file = file::append(path).await?; + file.write_all(&bytes).await?; + } + Confirmation::Nowait => { + let task = self.ensure_task(path).await; + task.sender + .send_async(FilePersisterCommand::Append(bytes)) + .await + .with_context(|| format!("Failed to queue append command for file: {}", path)) + .map_err(IggyError::CommandQueueError)?; + } + } Ok(()) } async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { - let mut file = file::overwrite(path).await?; - file.write_all(bytes).await?; - file.sync_all().await?; + match self.config.state.server_confirmation { + Confirmation::WaitWithFlush => { + let mut file = file::overwrite(path).await?; + file.write_all(bytes).await?; + file.sync_all().await?; + } + _ => { + let mut file = file::overwrite(path).await?; + file.write_all(bytes).await?; + } + } Ok(()) } diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index e519b77c8..75460c56d 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -5,6 +5,7 @@ use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::sizeable::Sizeable; +use iggy::confirmation::Confirmation; use iggy::error::IggyError; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -236,7 +237,10 @@ impl Segment { index } - pub async fn persist_messages(&mut self) -> Result { + pub async fn persist_messages( + &mut self, + confirmation: Option, + ) -> Result { let storage = self.storage.segment.clone(); if self.unsaved_messages.is_none() { return Ok(0); @@ -264,8 +268,12 @@ impl Segment { if has_remainder { self.unsaved_messages = Some(batch_accumulator); } - let saved_bytes = storage.save_batches(self, batch).await?; - storage.save_index(&self.index_path, index).await?; + let saved_bytes = storage + .save_batches(self, batch, confirmation.clone()) + .await?; + storage + .save_index(&self.index_path, index, confirmation) + .await?; self.last_index_position += batch_size; self.size_bytes += RETAINED_BATCH_OVERHEAD; self.size_of_parent_stream diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 3cfd44e1b..2417c1cc1 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -11,6 +11,7 @@ use crate::streaming::utils::head_tail_buf::HeadTailBuffer; use anyhow::Context; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; +use iggy::confirmation::Confirmation; use iggy::error::IggyError; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::checksum; @@ -211,6 +212,7 @@ impl SegmentStorage for FileSegmentStorage { &self, segment: &Segment, batch: RetainedMessageBatch, + confirmation: Option, ) -> Result { let batch_size = batch.get_size_bytes(); let mut bytes = BytesMut::with_capacity(batch_size as usize); @@ -218,7 +220,7 @@ impl SegmentStorage for FileSegmentStorage { if let Err(err) = self .persister - .append(&segment.log_path, &bytes) + .append(&segment.log_path, bytes.into(), confirmation) .await .with_context(|| format!("Failed to save messages to segment: {}", segment.log_path)) { @@ -394,14 +396,19 @@ impl SegmentStorage for FileSegmentStorage { Ok(Some(index_range)) } - async fn save_index(&self, index_path: &str, index: Index) -> Result<(), IggyError> { + async fn save_index( + &self, + index_path: &str, + index: Index, + confirmation: Option, + ) -> Result<(), IggyError> { let mut bytes = BytesMut::with_capacity(INDEX_SIZE as usize); bytes.put_u32_le(index.offset); bytes.put_u32_le(index.position); bytes.put_u64_le(index.timestamp); if let Err(err) = self .persister - .append(index_path, &bytes) + .append(index_path, bytes.into(), confirmation) .await .with_context(|| format!("Failed to save index to segment: {}", index_path)) { diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index b8a39330e..2b5ead91c 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -14,6 +14,7 @@ use crate::streaming::systems::storage::FileSystemInfoStorage; use crate::streaming::topics::storage::FileTopicStorage; use crate::streaming::topics::topic::Topic; use async_trait::async_trait; +use iggy::confirmation::Confirmation; use iggy::consumer::ConsumerKind; use iggy::error::IggyError; use std::fmt::{Debug, Formatter}; @@ -74,6 +75,7 @@ pub trait SegmentStorage: Send + Sync { &self, segment: &Segment, batch: RetainedMessageBatch, + confirmation: Option, ) -> Result; async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError>; async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>; @@ -84,7 +86,12 @@ pub trait SegmentStorage: Send + Sync { index_start_offset: u64, index_end_offset: u64, ) -> Result, IggyError>; - async fn save_index(&self, index_path: &str, index: Index) -> Result<(), IggyError>; + async fn save_index( + &self, + index_path: &str, + index: Index, + confirmation: Option, + ) -> Result<(), IggyError>; async fn try_load_index_for_timestamp( &self, segment: &Segment, @@ -157,6 +164,7 @@ pub(crate) mod tests { use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; use async_trait::async_trait; + use bytes::Bytes; use std::sync::Arc; struct TestPersister {} @@ -168,7 +176,12 @@ pub(crate) mod tests { #[async_trait] impl Persister for TestPersister { - async fn append(&self, _path: &str, _bytes: &[u8]) -> Result<(), IggyError> { + async fn append( + &self, + _path: &str, + _bytes: Bytes, + _confirmation: Option, + ) -> Result<(), IggyError> { Ok(()) } @@ -295,6 +308,7 @@ pub(crate) mod tests { &self, _segment: &Segment, _batch: RetainedMessageBatch, + _confirmation: Option, ) -> Result { Ok(0) } @@ -320,7 +334,12 @@ pub(crate) mod tests { Ok(None) } - async fn save_index(&self, _index_path: &str, _index: Index) -> Result<(), IggyError> { + async fn save_index( + &self, + _index_path: &str, + _index: Index, + _confirmation: Option, + ) -> Result<(), IggyError> { Ok(()) } diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 33bf2b930..7677f5e84 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -2,6 +2,7 @@ use crate::streaming::cache::memory_tracker::CacheMemoryTracker; use crate::streaming::session::Session; use crate::streaming::systems::system::System; use bytes::Bytes; +use iggy::confirmation::Confirmation; use iggy::consumer::Consumer; use iggy::messages::poll_messages::PollingStrategy; use iggy::messages::send_messages::Message; @@ -91,6 +92,7 @@ impl System { topic_id: Identifier, partitioning: Partitioning, messages: Vec, + confirmation: Option, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; let topic = self.find_topic(session, &stream_id, &topic_id)?; @@ -128,7 +130,7 @@ impl System { } let messages_count = messages.len() as u64; topic - .append_messages(batch_size_bytes, partitioning, messages) + .append_messages(batch_size_bytes, partitioning, messages, confirmation) .await?; self.metrics.increment_messages(messages_count); Ok(()) diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index 72df3f4c9..33ea5e981 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -81,7 +81,7 @@ pub struct System { const CACHE_OVER_EVICTION_FACTOR: u64 = 5; impl System { - pub fn new( + pub async fn new( config: Arc, data_maintenance_config: DataMaintenanceConfig, pat_config: PersonalAccessTokenConfig, @@ -99,8 +99,8 @@ impl System { false => None, }; - let state_persister = Self::resolve_persister(config.state.enforce_fsync); - let partition_persister = Self::resolve_persister(config.partition.enforce_fsync); + let state_persister = Arc::new(FilePersister::new(config.clone()).await); + let partition_persister = Arc::new(FilePersister::new(config.clone()).await); let state = Arc::new(FileState::new( &config.get_state_log_path(), @@ -118,13 +118,6 @@ impl System { ) } - fn resolve_persister(enforce_fsync: bool) -> Arc { - match enforce_fsync { - true => Arc::new(FileWithSyncPersister), - false => Arc::new(FilePersister), - } - } - pub fn create( system_config: Arc, storage: SystemStorage, diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 7187d4cd1..b67372dec 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -5,6 +5,7 @@ use crate::streaming::sizeable::Sizeable; use crate::streaming::topics::topic::Topic; use crate::streaming::utils::file::folder_size; use crate::streaming::utils::hash; +use iggy::confirmation::Confirmation; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::messages::poll_messages::{PollingKind, PollingStrategy}; @@ -73,6 +74,7 @@ impl Topic { batch_size: u64, partitioning: Partitioning, messages: Vec, + confirmation: Option, ) -> Result<(), IggyError> { if !self.has_partitions() { return Err(IggyError::NoPartitions(self.topic_id, self.stream_id)); @@ -97,7 +99,7 @@ impl Topic { }; let appendable_batch_info = AppendableBatchInfo::new(batch_size, partition_id); - self.append_messages_to_partition(appendable_batch_info, messages) + self.append_messages_to_partition(appendable_batch_info, messages, confirmation) .await } @@ -121,6 +123,7 @@ impl Topic { &self, appendable_batch_info: AppendableBatchInfo, messages: Vec, + confirmation: Option, ) -> Result<(), IggyError> { let partition = self.partitions.get(&appendable_batch_info.partition_id); partition @@ -133,7 +136,7 @@ impl Topic { })? .write() .await - .append_messages(appendable_batch_info, messages) + .append_messages(appendable_batch_info, messages, confirmation) .await?; Ok(()) @@ -308,7 +311,7 @@ mod tests { let messages = vec![Message::new(Some(entity_id as u128), Bytes::new(), None)]; let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); topic - .append_messages(batch_size, partitioning.clone(), messages) + .append_messages(batch_size, partitioning.clone(), messages, None) .await .unwrap(); } @@ -337,7 +340,7 @@ mod tests { let messages = vec![Message::new(Some(entity_id as u128), Bytes::new(), None)]; let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); topic - .append_messages(batch_size, partitioning, messages) + .append_messages(batch_size, partitioning, messages, None) .await .unwrap(); } diff --git a/server/src/streaming/topics/persistence.rs b/server/src/streaming/topics/persistence.rs index ecb63d7fa..cac240fbf 100644 --- a/server/src/streaming/topics/persistence.rs +++ b/server/src/streaming/topics/persistence.rs @@ -28,7 +28,7 @@ impl Topic { for partition in self.get_partitions() { let mut partition = partition.write().await; for segment in partition.get_segments_mut() { - saved_messages_number += segment.persist_messages().await?; + saved_messages_number += segment.persist_messages(None).await?; } }