Skip to content

Commit 0a1b1a7

Browse files
authored
Add checksum validation to state entry storage (#1048)
1 parent 43457b0 commit 0a1b1a7

File tree

6 files changed

+73
-21
lines changed

6 files changed

+73
-21
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration/tests/state/file.rs

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ async fn should_apply_single_entry() {
4242
assert_eq!(entry.term, 0);
4343
assert_eq!(entry.version, setup.version());
4444
assert_eq!(entry.flags, 0);
45+
assert!(entry.checksum > 0);
4546
assert!(entry.timestamp.as_micros() > 0);
4647
assert_eq!(entry.user_id, user_id);
4748
assert_eq!(entry.command, command_clone);
@@ -101,6 +102,7 @@ async fn should_apply_multiple_entries() {
101102
assert_eq!(create_user_entry.term, 0);
102103
assert_eq!(create_user_entry.version, setup.version());
103104
assert_eq!(create_user_entry.flags, 0);
105+
assert!(create_user_entry.checksum > 0);
104106
assert!(create_user_entry.timestamp.as_micros() > 0);
105107
assert_eq!(create_user_entry.user_id, 1);
106108
assert!(create_user_entry.context.is_empty());
@@ -111,6 +113,7 @@ async fn should_apply_multiple_entries() {
111113
assert_eq!(create_stream_entry.term, 0);
112114
assert_eq!(create_stream_entry.version, setup.version());
113115
assert_eq!(create_stream_entry.flags, 0);
116+
assert!(create_stream_entry.checksum > 0);
114117
assert!(create_stream_entry.timestamp.as_micros() > 0);
115118
assert!(create_stream_entry.timestamp.as_micros() > create_user_entry.timestamp.as_micros());
116119
assert_eq!(create_stream_entry.user_id, 2);

sdk/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub enum IggyError {
3636
StateFileNotFound = 14,
3737
#[error("State file corrupted")]
3838
StateFileCorrupted = 15,
39+
#[error("Invalid state entry checksum: {0}, expected: {1}, for index: {2}")]
40+
InvalidStateEntryChecksum(u32, u32, u64) = 16,
3941
#[error("Cannot open database, Path: {0}")]
4042
CannotOpenDatabase(String) = 19,
4143
#[error("Resource with key: {0} was not found.")]

server/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "server"
3-
version = "0.3.0"
3+
version = "0.3.1"
44
edition = "2021"
55
build = "src/build.rs"
66

server/src/state/entry.rs

+47-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::state::command::EntryCommand;
22
use bytes::{Buf, BufMut, Bytes, BytesMut};
33
use iggy::bytes_serializable::BytesSerializable;
44
use iggy::error::IggyError;
5+
use iggy::utils::checksum;
56
use iggy::utils::timestamp::IggyTimestamp;
67
use std::fmt::{Display, Formatter};
78

@@ -13,6 +14,7 @@ use std::fmt::{Display, Formatter};
1314
/// - `flags` - Reserved for future use
1415
/// - `timestamp` - Timestamp when the command was issued
1516
/// - `user_id` - User ID of the user who issued the command
17+
/// - `checksum` - Checksum of the entry
1618
/// - `code` - Command code
1719
/// - `command` - Payload of the command
1820
/// - `context` - Optional context e.g. used to enrich the payload with additional data
@@ -25,8 +27,9 @@ pub struct StateEntry {
2527
pub flags: u64,
2628
pub timestamp: IggyTimestamp,
2729
pub user_id: u32,
28-
pub command: EntryCommand,
30+
pub checksum: u32,
2931
pub context: Bytes,
32+
pub command: EntryCommand,
3033
}
3134

3235
impl StateEntry {
@@ -50,25 +53,57 @@ impl StateEntry {
5053
flags,
5154
timestamp,
5255
user_id,
56+
checksum: Self::calculate_checksum(
57+
index, term, leader_id, version, flags, timestamp, user_id, &context, &command,
58+
),
5359
context,
5460
command,
5561
}
5662
}
63+
64+
#[allow(clippy::too_many_arguments)]
65+
fn calculate_checksum(
66+
index: u64,
67+
term: u64,
68+
leader_id: u32,
69+
version: u32,
70+
flags: u64,
71+
timestamp: IggyTimestamp,
72+
user_id: u32,
73+
context: &Bytes,
74+
command: &EntryCommand,
75+
) -> u32 {
76+
let command = command.to_bytes();
77+
let mut bytes =
78+
BytesMut::with_capacity(8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + context.len() + command.len());
79+
bytes.put_u64_le(index);
80+
bytes.put_u64_le(term);
81+
bytes.put_u32_le(leader_id);
82+
bytes.put_u32_le(version);
83+
bytes.put_u64_le(flags);
84+
bytes.put_u64_le(timestamp.into());
85+
bytes.put_u32_le(user_id);
86+
bytes.put_u32_le(context.len() as u32);
87+
bytes.put_slice(context);
88+
bytes.extend(command);
89+
checksum::calculate(&bytes.freeze())
90+
}
5791
}
5892

5993
impl Display for StateEntry {
6094
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
6195
write!(
6296
f,
63-
"StateEntry {{ index: {}, term: {}, leader ID: {}, version: {}, flags: {}, timestamp: {}, user ID: {}, command: {:?} }}",
97+
"StateEntry {{ index: {}, term: {}, leader ID: {}, version: {}, flags: {}, timestamp: {}, user ID: {}, checksum: {}, command: {:?} }}",
6498
self.index,
6599
self.term,
66100
self.leader_id,
67101
self.version,
68102
self.flags,
69103
self.timestamp,
70104
self.user_id,
71-
self.command,
105+
self.checksum,
106+
self.command
72107
)
73108
}
74109
}
@@ -77,18 +112,19 @@ impl BytesSerializable for StateEntry {
77112
fn to_bytes(&self) -> Bytes {
78113
let command = self.command.to_bytes();
79114
let mut bytes = BytesMut::with_capacity(
80-
8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + self.context.len() + command.len(),
115+
8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + 4 + self.context.len() + command.len(),
81116
);
82117
bytes.put_u64_le(self.index);
83118
bytes.put_u64_le(self.term);
84119
bytes.put_u32_le(self.leader_id);
85120
bytes.put_u32_le(self.version);
86121
bytes.put_u64_le(self.flags);
87-
bytes.put_u64_le(self.timestamp.as_micros());
122+
bytes.put_u64_le(self.timestamp.into());
88123
bytes.put_u32_le(self.user_id);
124+
bytes.put_u32_le(self.checksum);
89125
bytes.put_u32_le(self.context.len() as u32);
90126
bytes.put_slice(&self.context);
91-
bytes.extend(self.command.to_bytes());
127+
bytes.extend(command);
92128
bytes.freeze()
93129
}
94130

@@ -103,9 +139,10 @@ impl BytesSerializable for StateEntry {
103139
let flags = bytes.slice(24..32).get_u64_le();
104140
let timestamp = IggyTimestamp::from(bytes.slice(32..40).get_u64_le());
105141
let user_id = bytes.slice(40..44).get_u32_le();
106-
let context_length = bytes.slice(44..48).get_u32_le() as usize;
107-
let context = bytes.slice(48..48 + context_length);
108-
let command = EntryCommand::from_bytes(bytes.slice(48 + context_length..))?;
142+
let checksum = bytes.slice(44..48).get_u32_le();
143+
let context_length = bytes.slice(48..52).get_u32_le() as usize;
144+
let context = bytes.slice(52..52 + context_length);
145+
let command = EntryCommand::from_bytes(bytes.slice(52 + context_length..))?;
109146

110147
Ok(StateEntry {
111148
index,
@@ -115,6 +152,7 @@ impl BytesSerializable for StateEntry {
115152
flags,
116153
timestamp,
117154
user_id,
155+
checksum,
118156
context,
119157
command,
120158
})

server/src/state/file.rs

+19-10
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ impl State for FileState {
117117
let flags = reader.read_u64_le().await?;
118118
let timestamp = IggyTimestamp::from(reader.read_u64_le().await?);
119119
let user_id = reader.read_u32_le().await?;
120+
let checksum = reader.read_u32_le().await?;
120121
let context_length = reader.read_u32_le().await? as usize;
121122
let mut context = BytesMut::with_capacity(context_length);
122123
context.put_bytes(0, context_length);
@@ -143,6 +144,14 @@ impl State for FileState {
143144
command,
144145
);
145146
debug!("Read state entry: {entry}");
147+
if entry.checksum != checksum {
148+
return Err(IggyError::InvalidStateEntryChecksum(
149+
entry.checksum,
150+
checksum,
151+
entry.index,
152+
));
153+
}
154+
146155
entries.push(entry);
147156
total_size += 8
148157
+ 8
@@ -152,6 +161,7 @@ impl State for FileState {
152161
+ 8
153162
+ 4
154163
+ 4
164+
+ 4
155165
+ context_length as u64
156166
+ 4
157167
+ 4
@@ -167,22 +177,21 @@ impl State for FileState {
167177

168178
async fn apply(&self, user_id: u32, command: EntryCommand) -> Result<(), IggyError> {
169179
debug!("Applying state entry with command: {command}, user ID: {user_id}");
170-
let entry = StateEntry {
171-
index: if self.entries_count.load(Ordering::SeqCst) == 0 {
180+
let entry = StateEntry::new(
181+
if self.entries_count.load(Ordering::SeqCst) == 0 {
172182
0
173183
} else {
174184
self.current_index.fetch_add(1, Ordering::SeqCst) + 1
175185
},
176-
term: self.term.load(Ordering::SeqCst),
177-
leader_id: self.current_leader.load(Ordering::SeqCst),
178-
version: self.version,
179-
flags: 0,
180-
timestamp: IggyTimestamp::now(),
186+
self.term.load(Ordering::SeqCst),
187+
self.current_leader.load(Ordering::SeqCst),
188+
self.version,
189+
0,
190+
IggyTimestamp::now(),
181191
user_id,
192+
Bytes::new(),
182193
command,
183-
context: Bytes::new(),
184-
};
185-
194+
);
186195
self.entries_count.fetch_add(1, Ordering::SeqCst);
187196
self.persister.append(&self.path, &entry.to_bytes()).await?;
188197
debug!("Applied state entry: {entry}");

0 commit comments

Comments
 (0)