Skip to content

Commit 64a945f

Browse files
corrected logic for schema key
1 parent b09cebf commit 64a945f

File tree

2 files changed

+29
-23
lines changed

2 files changed

+29
-23
lines changed

src/event/mod.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,17 @@ pub struct Event {
4747
// Events holds the schema related to a each event for a single log stream
4848
impl Event {
4949
pub async fn process(self) -> Result<(), EventError> {
50-
let key = get_schema_key(&self.rb.schema().fields);
50+
let mut key = get_schema_key(&self.rb.schema().fields);
51+
if self.time_partition.is_some() {
52+
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
53+
key.push_str(&parsed_timestamp_to_min);
54+
}
55+
56+
if !self.custom_partition_values.is_empty() {
57+
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
58+
key.push_str(&format!("&{k}={v}"));
59+
}
60+
}
5161
if self.is_first_event {
5262
commit_schema(&self.stream_name, self.rb.schema())?;
5363
}

src/staging/streams.rs

+18-22
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ impl<'a> Stream<'a> {
165165
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
166166
.sorted_by_key(|f| f.metadata().unwrap().created().unwrap())
167167
.collect();
168-
168+
169169
paths
170170
}
171171

@@ -179,33 +179,29 @@ impl<'a> Stream<'a> {
179179
shutdown_signal: bool,
180180
) -> HashMap<PathBuf, Vec<PathBuf>> {
181181
let now = Utc::now();
182-
182+
183183
// Extract date and time components of current time
184184
let now_date = (now.year(), now.month(), now.day());
185185
let now_time = (now.hour(), now.minute());
186186

187187
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
188188
let mut arrow_files = self.arrow_files();
189-
arrow_files = arrow_files
190-
.into_iter()
191-
.filter(|path| {
192-
let created_at = path.metadata().unwrap().created().unwrap();
193-
let created_at: DateTime<Utc> = created_at.into();
194-
let created_date = (created_at.year(), created_at.month(), created_at.day());
195-
let created_time = (created_at.hour(), created_at.minute());
196-
197-
let same_date = now_date == created_date;
198-
let same_time = now_time == created_time;
199-
200-
// if the shutdown signal is false i.e. normal condition
201-
// don't keep the ones for the current minute
202-
if !shutdown_signal {
203-
!same_date || !same_time
204-
} else {
205-
true
206-
}
207-
})
208-
.collect();
189+
arrow_files.retain(|path| {
190+
let created_at = path.metadata().unwrap().created().unwrap();
191+
let created_at: DateTime<Utc> = created_at.into();
192+
let created_date = (created_at.year(), created_at.month(), created_at.day());
193+
let created_time = (created_at.hour(), created_at.minute());
194+
195+
let same_date = now_date == created_date;
196+
let same_time = now_time == created_time;
197+
// if the shutdown signal is false i.e. normal condition
198+
// don't keep the ones for the current minute
199+
if !shutdown_signal {
200+
!same_date || !same_time
201+
} else {
202+
true
203+
}
204+
});
209205

210206
let random_string =
211207
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15);

0 commit comments

Comments
 (0)