Skip to content

Commit 52cfd04

Browse files
committed
Add support for "nowait" mode in file synchronization
1 parent 00f66ea commit 52cfd04

34 files changed

+497
-119
lines changed

configs/server.toml

+15
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,14 @@ path = "compatibility"
338338
# `false` allows the OS to manage write operations, which can improve performance.
339339
enforce_fsync = false
340340

341+
# Maximum number of retries for a failed file operation (e.g., append, overwrite).
342+
# This defines how many times the system will attempt the operation before failing.
343+
max_file_operation_retries = 1
344+
345+
# Delay between retries in case of a failed file operation.
346+
# This helps to avoid immediate repeated attempts and can reduce load.
347+
retry_delay = "1 s"
348+
341349
# Runtime configuration.
342350
[system.runtime]
343351
# Path for storing runtime data.
@@ -452,6 +460,13 @@ size = "1 GB"
452460
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
453461
message_expiry = "none"
454462

463+
# Defines the file system confirmation behavior during state updates.
464+
# Controls how the system waits for file write operations to complete.
465+
# Possible values:
466+
# - "wait": waits for the file operation to complete before proceeding.
467+
# - "nowait": proceeds without waiting for the file operation to finish, potentially increasing performance but at the cost of durability.
468+
server_confirmation = "wait"
469+
455470
# Configures whether expired segments are archived (boolean) or just deleted without archiving.
456471
archive_expired = false
457472

integration/tests/streaming/messages.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
4646
Arc::new(AtomicU64::new(0)),
4747
Arc::new(AtomicU32::new(0)),
4848
IggyTimestamp::now(),
49-
);
49+
)
50+
.await;
5051

5152
let mut messages = Vec::with_capacity(messages_count as usize);
5253
let mut appended_messages = Vec::with_capacity(messages_count as usize);
@@ -119,12 +120,12 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
119120
partition.partition_id,
120121
);
121122
partition
122-
.append_messages(appendable_batch_info, messages)
123+
.append_messages(appendable_batch_info, messages, None)
123124
.await
124125
.unwrap();
125126
let test_timestamp = IggyTimestamp::now();
126127
partition
127-
.append_messages(appendable_batch_info_two, messages_two)
128+
.append_messages(appendable_batch_info_two, messages_two, None)
128129
.await
129130
.unwrap();
130131

@@ -183,7 +184,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
183184
Arc::new(AtomicU64::new(0)),
184185
Arc::new(AtomicU32::new(0)),
185186
IggyTimestamp::now(),
186-
);
187+
)
188+
.await;
187189

188190
let mut messages = Vec::with_capacity(messages_count as usize);
189191
let mut appended_messages = Vec::with_capacity(messages_count as usize);
@@ -229,7 +231,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
229231
partition.partition_id,
230232
);
231233
partition
232-
.append_messages(appendable_batch_info, messages)
234+
.append_messages(appendable_batch_info, messages, None)
233235
.await
234236
.unwrap();
235237
assert_eq!(partition.unsaved_messages_count, 0);
@@ -249,7 +251,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
249251
Arc::new(AtomicU64::new(0)),
250252
Arc::new(AtomicU32::new(0)),
251253
now,
252-
);
254+
)
255+
.await;
253256
let partition_state = PartitionState {
254257
id: partition.partition_id,
255258
created_at: now,

integration/tests/streaming/partition.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ async fn should_persist_partition_with_segment() {
3535
Arc::new(AtomicU64::new(0)),
3636
Arc::new(AtomicU32::new(0)),
3737
IggyTimestamp::now(),
38-
);
38+
)
39+
.await;
3940

4041
partition.persist().await.unwrap();
4142

@@ -66,7 +67,8 @@ async fn should_load_existing_partition_from_disk() {
6667
Arc::new(AtomicU64::new(0)),
6768
Arc::new(AtomicU32::new(0)),
6869
IggyTimestamp::now(),
69-
);
70+
)
71+
.await;
7072
partition.persist().await.unwrap();
7173
assert_persisted_partition(&partition.partition_path, with_segment).await;
7274

@@ -85,7 +87,8 @@ async fn should_load_existing_partition_from_disk() {
8587
Arc::new(AtomicU64::new(0)),
8688
Arc::new(AtomicU32::new(0)),
8789
now,
88-
);
90+
)
91+
.await;
8992
let partition_state = PartitionState {
9093
id: partition.partition_id,
9194
created_at: now,
@@ -139,7 +142,8 @@ async fn should_delete_existing_partition_from_disk() {
139142
Arc::new(AtomicU64::new(0)),
140143
Arc::new(AtomicU32::new(0)),
141144
IggyTimestamp::now(),
142-
);
145+
)
146+
.await;
143147
partition.persist().await.unwrap();
144148
assert_persisted_partition(&partition.partition_path, with_segment).await;
145149

@@ -172,7 +176,8 @@ async fn should_purge_existing_partition_on_disk() {
172176
Arc::new(AtomicU64::new(0)),
173177
Arc::new(AtomicU32::new(0)),
174178
IggyTimestamp::now(),
175-
);
179+
)
180+
.await;
176181
partition.persist().await.unwrap();
177182
assert_persisted_partition(&partition.partition_path, with_segment).await;
178183
let messages = create_messages();
@@ -185,7 +190,7 @@ async fn should_purge_existing_partition_on_disk() {
185190
partition.partition_id,
186191
);
187192
partition
188-
.append_messages(appendable_batch_info, messages)
193+
.append_messages(appendable_batch_info, messages, None)
189194
.await
190195
.unwrap();
191196
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();

integration/tests/streaming/segment.rs

+107-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::streaming::common::test_setup::TestSetup;
22
use bytes::Bytes;
33
use iggy::bytes_serializable::BytesSerializable;
4+
use iggy::confirmation::Confirmation;
45
use iggy::models::messages::{MessageState, PolledMessage};
56
use iggy::utils::byte_size::IggyByteSize;
67
use iggy::utils::expiry::IggyExpiry;
@@ -11,7 +12,9 @@ use server::streaming::segments::segment;
1112
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
1213
use std::sync::atomic::AtomicU64;
1314
use std::sync::Arc;
15+
use std::time::Duration;
1416
use tokio::fs;
17+
use tokio::time::sleep;
1518

1619
#[tokio::test]
1720
async fn should_persist_segment() {
@@ -35,7 +38,8 @@ async fn should_persist_segment() {
3538
Arc::new(AtomicU64::new(0)),
3639
Arc::new(AtomicU64::new(0)),
3740
Arc::new(AtomicU64::new(0)),
38-
);
41+
)
42+
.await;
3943

4044
setup
4145
.create_partition_directory(stream_id, topic_id, partition_id)
@@ -73,7 +77,8 @@ async fn should_load_existing_segment_from_disk() {
7377
Arc::new(AtomicU64::new(0)),
7478
Arc::new(AtomicU64::new(0)),
7579
Arc::new(AtomicU64::new(0)),
76-
);
80+
)
81+
.await;
7782
setup
7883
.create_partition_directory(stream_id, topic_id, partition_id)
7984
.await;
@@ -100,7 +105,8 @@ async fn should_load_existing_segment_from_disk() {
100105
Arc::new(AtomicU64::new(0)),
101106
Arc::new(AtomicU64::new(0)),
102107
Arc::new(AtomicU64::new(0)),
103-
);
108+
)
109+
.await;
104110
loaded_segment.load().await.unwrap();
105111
let loaded_messages = loaded_segment.get_messages(0, 10).await.unwrap();
106112

@@ -137,7 +143,91 @@ async fn should_persist_and_load_segment_with_messages() {
137143
Arc::new(AtomicU64::new(0)),
138144
Arc::new(AtomicU64::new(0)),
139145
Arc::new(AtomicU64::new(0)),
140-
);
146+
)
147+
.await;
148+
149+
setup
150+
.create_partition_directory(stream_id, topic_id, partition_id)
151+
.await;
152+
segment.persist().await.unwrap();
153+
assert_persisted_segment(
154+
&setup
155+
.config
156+
.get_partition_path(stream_id, topic_id, partition_id),
157+
start_offset,
158+
)
159+
.await;
160+
let messages_count = 10;
161+
let mut messages = Vec::new();
162+
let mut batch_size = IggyByteSize::default();
163+
for i in 0..messages_count {
164+
let message = create_message(i, "test", IggyTimestamp::now());
165+
166+
let retained_message = Arc::new(RetainedMessage {
167+
id: message.id,
168+
offset: message.offset,
169+
timestamp: message.timestamp,
170+
checksum: message.checksum,
171+
message_state: message.state,
172+
headers: message.headers.map(|headers| headers.to_bytes()),
173+
payload: message.payload.clone(),
174+
});
175+
batch_size += retained_message.get_size_bytes();
176+
messages.push(retained_message);
177+
}
178+
179+
segment
180+
.append_batch(batch_size, messages_count as u32, &messages)
181+
.await
182+
.unwrap();
183+
segment.persist_messages(None).await.unwrap();
184+
let mut loaded_segment = segment::Segment::create(
185+
stream_id,
186+
topic_id,
187+
partition_id,
188+
start_offset,
189+
setup.config.clone(),
190+
setup.storage.clone(),
191+
IggyExpiry::NeverExpire,
192+
Arc::new(AtomicU64::new(0)),
193+
Arc::new(AtomicU64::new(0)),
194+
Arc::new(AtomicU64::new(0)),
195+
Arc::new(AtomicU64::new(0)),
196+
Arc::new(AtomicU64::new(0)),
197+
Arc::new(AtomicU64::new(0)),
198+
)
199+
.await;
200+
loaded_segment.load().await.unwrap();
201+
let messages = loaded_segment
202+
.get_messages(0, messages_count as u32)
203+
.await
204+
.unwrap();
205+
assert_eq!(messages.len(), messages_count as usize);
206+
}
207+
208+
#[tokio::test]
209+
async fn should_persist_and_load_segment_with_messages_with_nowait_confirmation() {
210+
let setup = TestSetup::init().await;
211+
let stream_id = 1;
212+
let topic_id = 2;
213+
let partition_id = 3;
214+
let start_offset = 0;
215+
let mut segment = segment::Segment::create(
216+
stream_id,
217+
topic_id,
218+
partition_id,
219+
start_offset,
220+
setup.config.clone(),
221+
setup.storage.clone(),
222+
IggyExpiry::NeverExpire,
223+
Arc::new(AtomicU64::new(0)),
224+
Arc::new(AtomicU64::new(0)),
225+
Arc::new(AtomicU64::new(0)),
226+
Arc::new(AtomicU64::new(0)),
227+
Arc::new(AtomicU64::new(0)),
228+
Arc::new(AtomicU64::new(0)),
229+
)
230+
.await;
141231

142232
setup
143233
.create_partition_directory(stream_id, topic_id, partition_id)
@@ -173,7 +263,11 @@ async fn should_persist_and_load_segment_with_messages() {
173263
.append_batch(batch_size, messages_count as u32, &messages)
174264
.await
175265
.unwrap();
176-
segment.persist_messages().await.unwrap();
266+
segment
267+
.persist_messages(Some(Confirmation::Nowait))
268+
.await
269+
.unwrap();
270+
sleep(Duration::from_millis(200)).await;
177271
let mut loaded_segment = segment::Segment::create(
178272
stream_id,
179273
topic_id,
@@ -188,7 +282,8 @@ async fn should_persist_and_load_segment_with_messages() {
188282
Arc::new(AtomicU64::new(0)),
189283
Arc::new(AtomicU64::new(0)),
190284
Arc::new(AtomicU64::new(0)),
191-
);
285+
)
286+
.await;
192287
loaded_segment.load().await.unwrap();
193288
let messages = loaded_segment
194289
.get_messages(0, messages_count as u32)
@@ -220,7 +315,8 @@ async fn given_all_expired_messages_segment_should_be_expired() {
220315
Arc::new(AtomicU64::new(0)),
221316
Arc::new(AtomicU64::new(0)),
222317
Arc::new(AtomicU64::new(0)),
223-
);
318+
)
319+
.await;
224320

225321
setup
226322
.create_partition_directory(stream_id, topic_id, partition_id)
@@ -258,7 +354,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
258354
.append_batch(batch_size, messages_count as u32, &messages)
259355
.await
260356
.unwrap();
261-
segment.persist_messages().await.unwrap();
357+
segment.persist_messages(None).await.unwrap();
262358

263359
segment.is_closed = true;
264360
let is_expired = segment.is_expired(now).await;
@@ -288,7 +384,8 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
288384
Arc::new(AtomicU64::new(0)),
289385
Arc::new(AtomicU64::new(0)),
290386
Arc::new(AtomicU64::new(0)),
291-
);
387+
)
388+
.await;
292389

293390
setup
294391
.create_partition_directory(stream_id, topic_id, partition_id)
@@ -343,7 +440,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
343440
.append_batch(not_expired_message_size, 1, &not_expired_messages)
344441
.await
345442
.unwrap();
346-
segment.persist_messages().await.unwrap();
443+
segment.persist_messages(None).await.unwrap();
347444

348445
let is_expired = segment.is_expired(now).await;
349446
assert!(!is_expired);

integration/tests/streaming/stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async fn should_purge_existing_stream_on_disk() {
134134
.map(|msg| msg.get_size_bytes())
135135
.sum::<IggyByteSize>();
136136
topic
137-
.append_messages(batch_size, Partitioning::partition_id(1), messages)
137+
.append_messages(batch_size, Partitioning::partition_id(1), messages, None)
138138
.await
139139
.unwrap();
140140
let loaded_messages = topic

0 commit comments

Comments
 (0)