Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "no_wait" mode for file synchronization #1425

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ path = "compatibility"
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false

# Maximum number of retries for a failed file operation (e.g., append, overwrite).
# This defines how many times the system will attempt the operation before failing.
max_file_operation_retries = 1

# Delay between retries in case of a failed file operation.
# This helps to avoid immediate repeated attempts and can reduce load.
retry_delay = "1 s"

# Runtime configuration.
[system.runtime]
# Path for storing runtime data.
Expand Down Expand Up @@ -452,6 +460,13 @@ size = "1 GB"
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "none"

# Defines the file system confirmation behavior during state updates.
# Controls how the system waits for file write operations to complete.
# Possible values:
# - "wait": waits for the file operation to complete before proceeding.
# - "nowait": proceeds without waiting for the file operation to finish, potentially increasing performance but at the cost of durability.
server_confirmation = "wait"

# Configures whether expired segments are archived (boolean) or just deleted without archiving.
archive_expired = false

Expand Down
15 changes: 9 additions & 6 deletions integration/tests/streaming/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
Expand Down Expand Up @@ -119,12 +120,12 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let test_timestamp = IggyTimestamp::now();
partition
.append_messages(appendable_batch_info_two, messages_two)
.append_messages(appendable_batch_info_two, messages_two, None)
.await
.unwrap();

Expand Down Expand Up @@ -183,7 +184,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
Expand Down Expand Up @@ -229,7 +231,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
assert_eq!(partition.unsaved_messages_count, 0);
Expand All @@ -249,7 +251,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
);
)
.await;
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
Expand Down
17 changes: 11 additions & 6 deletions integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ async fn should_persist_partition_with_segment() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

partition.persist().await.unwrap();

Expand Down Expand Up @@ -66,7 +67,8 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;

Expand All @@ -85,7 +87,8 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
);
)
.await;
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
Expand Down Expand Up @@ -139,7 +142,8 @@ async fn should_delete_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;

Expand Down Expand Up @@ -172,7 +176,8 @@ async fn should_purge_existing_partition_on_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;
let messages = create_messages();
Expand All @@ -185,7 +190,7 @@ async fn should_purge_existing_partition_on_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();
Expand Down
117 changes: 107 additions & 10 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::confirmation::Confirmation;
use iggy::models::messages::{MessageState, PolledMessage};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
Expand All @@ -11,7 +12,9 @@ use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::time::sleep;

#[tokio::test]
async fn should_persist_segment() {
Expand All @@ -35,7 +38,8 @@ async fn should_persist_segment() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -73,7 +77,8 @@ async fn should_load_existing_segment_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;
setup
.create_partition_directory(stream_id, topic_id, partition_id)
.await;
Expand All @@ -100,7 +105,8 @@ async fn should_load_existing_segment_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;
loaded_segment.load().await.unwrap();
let loaded_messages = loaded_segment.get_messages(0, 10).await.unwrap();

Expand Down Expand Up @@ -137,7 +143,91 @@ async fn should_persist_and_load_segment_with_messages() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
.await;
segment.persist().await.unwrap();
assert_persisted_segment(
&setup
.config
.get_partition_path(stream_id, topic_id, partition_id),
start_offset,
)
.await;
let messages_count = 10;
let mut messages = Vec::new();
let mut batch_size = IggyByteSize::default();
for i in 0..messages_count {
let message = create_message(i, "test", IggyTimestamp::now());

let retained_message = Arc::new(RetainedMessage {
id: message.id,
offset: message.offset,
timestamp: message.timestamp,
checksum: message.checksum,
message_state: message.state,
headers: message.headers.map(|headers| headers.to_bytes()),
payload: message.payload.clone(),
});
batch_size += retained_message.get_size_bytes();
messages.push(retained_message);
}

segment
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages(None).await.unwrap();
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
)
.await;
loaded_segment.load().await.unwrap();
let messages = loaded_segment
.get_messages(0, messages_count as u32)
.await
.unwrap();
assert_eq!(messages.len(), messages_count as usize);
}

#[tokio::test]
async fn should_persist_and_load_segment_with_messages_with_nowait_confirmation() {
let setup = TestSetup::init().await;
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let mut segment = segment::Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -173,7 +263,11 @@ async fn should_persist_and_load_segment_with_messages() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment
.persist_messages(Some(Confirmation::Nowait))
.await
.unwrap();
sleep(Duration::from_millis(200)).await;
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
Expand All @@ -188,7 +282,8 @@ async fn should_persist_and_load_segment_with_messages() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;
loaded_segment.load().await.unwrap();
let messages = loaded_segment
.get_messages(0, messages_count as u32)
Expand Down Expand Up @@ -220,7 +315,8 @@ async fn given_all_expired_messages_segment_should_be_expired() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -258,7 +354,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();

segment.is_closed = true;
let is_expired = segment.is_expired(now).await;
Expand Down Expand Up @@ -288,7 +384,8 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -343,7 +440,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.append_batch(not_expired_message_size, 1, &not_expired_messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();

let is_expired = segment.is_expired(now).await;
assert!(!is_expired);
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn should_purge_existing_stream_on_disk() {
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>();
topic
.append_messages(batch_size, Partitioning::partition_id(1), messages)
.append_messages(batch_size, Partitioning::partition_id(1), messages, None)
.await
.unwrap();
let loaded_messages = topic
Expand Down
Loading
Loading