Skip to content

Commit

Permalink
Add support for "nowait" mode in file synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
haze518 committed Nov 12, 2024
1 parent 4b40506 commit afd2fdc
Show file tree
Hide file tree
Showing 34 changed files with 514 additions and 86 deletions.
6 changes: 5 additions & 1 deletion configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@
}
},
"state": {
"enforce_fsync": false
"enforce_fsync": false,
"idle_timeout": "5 m",
"server_confirmation": "wait",
"max_file_operation_retries": 1,
"retry_delay": "1 s"
},
"runtime": {
"path": "runtime"
Expand Down
22 changes: 21 additions & 1 deletion configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,26 @@ path = "compatibility"
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false

# Defines the file system confirmation behavior during state updates.
# Controls how the system waits for file write operations to complete.
# Possible values:
# - "wait": waits for the file operation to complete before proceeding.
# - "wait_with_flush": waits for the file operation to complete and ensures data is flushed to disk.
# - "nowait": proceeds without waiting for the file operation to finish, potentially increasing performance but at the cost of durability.
server_confirmation = "wait"

# Timeout for idle state, after which the task will be considered inactive.
# The task will be notified if it remains idle for this duration.
idle_timeout = "5 m"

# Maximum number of retries for a failed file operation (e.g., append, overwrite).
# This defines how many times the system will attempt the operation before failing.
max_file_operation_retries = 1

# Delay between retries in case of a failed file operation.
# This helps to avoid immediate repeated attempts and can reduce load.
retry_delay = "1 s"

# Runtime configuration.
[system.runtime]
# Path for storing runtime data.
Expand Down Expand Up @@ -420,7 +440,7 @@ validate_checksum = false
# The threshold of buffered messages before triggering a save to disk (integer).
# Specifies how many messages accumulate before persisting to storage.
# Adjusting this can balance between write performance and data durability.
messages_required_to_save = 5000
messages_required_to_save = 500

# Segment configuration
[system.segment]
Expand Down
5 changes: 4 additions & 1 deletion integration/tests/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use server::configs::system::SystemConfig;
use server::state::file::FileState;
use server::streaming::persistence::persister::FilePersister;
use server::versioning::SemanticVersion;
Expand Down Expand Up @@ -31,7 +32,9 @@ impl StateSetup {
create_dir(&directory_path).await.unwrap();

let version = SemanticVersion::from_str("1.2.3").unwrap();
let persister = FilePersister {};

let config = SystemConfig::default();
let persister = FilePersister::new(Arc::new(config)).await;
let encryptor: Option<Arc<dyn Encryptor>> = match encryption_key {
Some(key) => Some(Arc::new(Aes256GcmEncryptor::new(key).unwrap())),
None => None,
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = FilePersister {};
let persister = FilePersister::new(config.clone()).await;
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
6 changes: 3 additions & 3 deletions integration/tests/streaming/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let test_timestamp = IggyTimestamp::now();
partition
.append_messages(appendable_batch_info_two, messages_two)
.append_messages(appendable_batch_info_two, messages_two, None)
.await
.unwrap();

Expand Down Expand Up @@ -217,7 +217,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
assert_eq!(partition.unsaved_messages_count, 0);
Expand Down
1 change: 1 addition & 0 deletions integration/tests/streaming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod common;
mod consumer_offset;
mod messages;
mod partition;
mod persistence;
mod segment;
mod snapshot;
mod stream;
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn should_purge_existing_partition_on_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();
Expand Down
106 changes: 106 additions & 0 deletions integration/tests/streaming/persistence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::{sync::Arc, time::Duration};

use server::streaming::persistence::persister::{FilePersister, Persister};

use bytes::Bytes;
use iggy::{confirmation::Confirmation, utils::duration::IggyDuration};
use server::configs::system::SystemConfig;
use tempfile::NamedTempFile;
use tokio::{io::AsyncReadExt, time::sleep};

#[tokio::test]
async fn test_append_nowait() {
let config = SystemConfig::default();

let temp_out_file = NamedTempFile::new().unwrap();
let file_path = temp_out_file.path().to_path_buf();

let bytes = b"test data";

let persister = FilePersister::new(Arc::new(config)).await;
let err = persister
.append(
file_path.to_str().unwrap(),
Bytes::copy_from_slice(bytes),
Some(Confirmation::Nowait),
)
.await;
assert!(err.is_ok());

sleep(Duration::from_millis(100)).await;

let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await.unwrap();

assert_eq!(buffer, bytes);
}

#[tokio::test]
async fn test_task_removal_on_idle_timeout_and_persistence_of_active_task() {
let mut config = SystemConfig::default();
config.state.idle_timeout = IggyDuration::new(Duration::from_millis(100));

let temp_out_file_1 = NamedTempFile::new().unwrap();
let file_path_1 = temp_out_file_1.path().to_path_buf();
let file_path_str_1 = file_path_1.to_str().unwrap();

let temp_out_file_2 = NamedTempFile::new().unwrap();
let file_path_2 = temp_out_file_2.path().to_path_buf();
let file_path_str_2 = file_path_2.to_str().unwrap();

let persister = FilePersister::new(Arc::new(config)).await;

assert!(
!persister.is_task_active(file_path_str_1),
"Task 1 should not be active initially"
);
assert!(
!persister.is_task_active(file_path_str_2),
"Task 2 should not be active initially"
);

// Activate the first task by issuing an append command
let err = persister
.append(file_path_str_1, Bytes::new(), Some(Confirmation::Nowait))
.await;
assert!(err.is_ok());
assert!(
persister.is_task_active(file_path_str_1),
"Task 1 should be active after appending"
);

// Wait 50 ms, then activate the second task to refresh its timeout
sleep(Duration::from_millis(50)).await;
let err = persister
.append(file_path_str_2, Bytes::new(), Some(Confirmation::Nowait))
.await;
assert!(err.is_ok());
assert!(
persister.is_task_active(file_path_str_2),
"Task 2 should be active after appending"
);

// Wait another 70 ms, so the total time for the first task (120 ms) exceeds idle_timeout,
// but the second task remains active since its timeout was refreshed
sleep(Duration::from_millis(70)).await;

// Ensure the second task is still active after its recent append
assert!(
persister.is_task_active(file_path_str_2),
"Task 2 should still be active after recent append"
);

// Confirm that the first task has been terminated due to idle timeout expiration
assert!(
!persister.is_task_active(file_path_str_1),
"Task 1 should no longer be active after timeout"
);

// Wait another 150 ms to confirm that the second task also terminates after its own idle timeout
sleep(Duration::from_millis(150)).await;
assert!(
!persister.is_task_active(file_path_str_2),
"Task 2 should no longer be active after idle timeout"
);
}
6 changes: 3 additions & 3 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn should_persist_and_load_segment_with_messages() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
Expand Down Expand Up @@ -257,7 +257,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();

segment.is_closed = true;
let is_expired = segment.is_expired(now).await;
Expand Down Expand Up @@ -342,7 +342,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.append_batch(not_expired_message_size, 1, &not_expired_messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();

let is_expired = segment.is_expired(now).await;
assert!(!is_expired);
Expand Down
3 changes: 2 additions & 1 deletion integration/tests/streaming/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ async fn should_create_snapshot_file() {
setup.config.clone(),
DataMaintenanceConfig::default(),
PersonalAccessTokenConfig::default(),
);
)
.await;

system.init().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn should_purge_existing_stream_on_disk() {
.unwrap();
let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum();
topic
.append_messages(batch_size, Partitioning::partition_id(1), messages)
.append_messages(batch_size, Partitioning::partition_id(1), messages, None)
.await
.unwrap();
let loaded_messages = topic
Expand Down
12 changes: 8 additions & 4 deletions integration/tests/streaming/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ async fn should_initialize_system_and_base_directories() {
setup.config.clone(),
DataMaintenanceConfig::default(),
PersonalAccessTokenConfig::default(),
);
)
.await;

system.init().await.unwrap();

Expand All @@ -36,7 +37,8 @@ async fn should_create_and_persist_stream() {
setup.config.clone(),
DataMaintenanceConfig::default(),
PersonalAccessTokenConfig::default(),
);
)
.await;
let stream_id = 1;
let stream_name = "test";
let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
Expand All @@ -57,7 +59,8 @@ async fn should_create_and_persist_stream_with_automatically_generated_id() {
setup.config.clone(),
DataMaintenanceConfig::default(),
PersonalAccessTokenConfig::default(),
);
)
.await;
let stream_id = 1;
let stream_name = "test";
let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
Expand All @@ -78,7 +81,8 @@ async fn should_delete_persisted_stream() {
setup.config.clone(),
DataMaintenanceConfig::default(),
PersonalAccessTokenConfig::default(),
);
)
.await;
let stream_id = 1;
let stream_name = "test";
let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async fn should_purge_existing_topic_on_disk() {
let messages_count = messages.len();
let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum();
topic
.append_messages(batch_size, Partitioning::partition_id(1), messages)
.append_messages(batch_size, Partitioning::partition_id(1), messages, None)
.await
.unwrap();
let loaded_messages = topic
Expand Down
5 changes: 4 additions & 1 deletion integration/tests/streaming/topic_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn assert_polling_messages(cache: CacheConfig, expect_enabled_cache: bool)
}
let batch_size = messages.iter().map(|m| m.get_size_bytes() as u64).sum();
topic
.append_messages(batch_size, partitioning, messages)
.append_messages(batch_size, partitioning, messages, None)
.await
.unwrap();

Expand Down Expand Up @@ -125,6 +125,7 @@ async fn given_key_none_messages_should_be_appended_to_the_next_partition_using_
batch_size,
partitioning.clone(),
vec![get_message(i as u128, &payload)],
None,
)
.await
.unwrap();
Expand All @@ -150,6 +151,7 @@ async fn given_key_partition_id_messages_should_be_appended_to_the_chosen_partit
batch_size,
partitioning.clone(),
vec![get_message(i as u128, &payload)],
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -179,6 +181,7 @@ async fn given_key_messages_key_messages_should_be_appended_to_the_calculated_pa
batch_size,
partitioning,
vec![get_message(entity_id as u128, &payload)],
None,
)
.await
.unwrap();
Expand Down
35 changes: 35 additions & 0 deletions sdk/src/confirmation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{fmt, str::FromStr};

use serde::{Deserialize, Serialize};

#[derive(Clone, Default, Deserialize, Serialize, Debug)]
pub enum Confirmation {
#[default]
WaitWithFlush,
Wait,
Nowait,
}

impl FromStr for Confirmation {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"wait_with_flush" => Ok(Confirmation::WaitWithFlush),
"wait" => Ok(Confirmation::Wait),
"nowait" => Ok(Confirmation::Nowait),
_ => Err(format!("Invalid confirmation type: {}", s)),
}
}
}

impl fmt::Display for Confirmation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Confirmation::WaitWithFlush => "wait_with_flush",
Confirmation::Wait => "wait",
Confirmation::Nowait => "nowait",
};
write!(f, "{}", s)
}
}
2 changes: 2 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ pub enum IggyError {
InvalidConnectionString = 8000,
#[error("Snaphot file completion failed")]
SnapshotFileCompletionFailed = 9000,
#[error("Failed to queue command")]
CommandQueueError(#[source] anyhow::Error) = 10000,
}

impl IggyError {
Expand Down
Loading

0 comments on commit afd2fdc

Please sign in to comment.