Skip to content

Commit fba51f1

Browse files
committed
Merge remote-tracking branch 'origin/main' into query-param
2 parents 4b0c0dc + 4af9a5e commit fba51f1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2348
-1333
lines changed

Diff for: Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ bytes = "1.4"
106106
clokwerk = "0.4"
107107
derive_more = { version = "1", features = ["full"] }
108108
itertools = "0.14"
109-
lazy_static = "1.4"
110109
once_cell = "1.20"
111110
rand = "0.8.5"
112111
regex = "1.7.3"

Diff for: src/alerts/mod.rs

+50
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode;
2424
use http::StatusCode;
2525
use itertools::Itertools;
2626
use once_cell::sync::Lazy;
27+
use serde::Serialize;
2728
use serde_json::Error as SerdeError;
2829
use std::collections::{HashMap, HashSet};
2930
use std::fmt::{self, Display};
@@ -873,3 +874,52 @@ impl Alerts {
873874
Ok(())
874875
}
875876
}
877+
878+
#[derive(Debug, Serialize)]
879+
pub struct AlertsInfo {
880+
total: u64,
881+
silenced: u64,
882+
resolved: u64,
883+
triggered: u64,
884+
low: u64,
885+
medium: u64,
886+
high: u64,
887+
}
888+
889+
// TODO: add RBAC
890+
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
891+
let alerts = ALERTS.alerts.read().await;
892+
let mut total = 0;
893+
let mut silenced = 0;
894+
let mut resolved = 0;
895+
let mut triggered = 0;
896+
let mut low = 0;
897+
let mut medium = 0;
898+
let mut high = 0;
899+
900+
for (_, alert) in alerts.iter() {
901+
total += 1;
902+
match alert.state {
903+
AlertState::Silenced => silenced += 1,
904+
AlertState::Resolved => resolved += 1,
905+
AlertState::Triggered => triggered += 1,
906+
}
907+
908+
match alert.severity {
909+
Severity::Low => low += 1,
910+
Severity::Medium => medium += 1,
911+
Severity::High => high += 1,
912+
_ => {}
913+
}
914+
}
915+
916+
Ok(AlertsInfo {
917+
total,
918+
silenced,
919+
resolved,
920+
triggered,
921+
low,
922+
medium,
923+
high,
924+
})
925+
}

Diff for: src/catalog/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,12 @@ pub async fn remove_manifest_from_snapshot(
340340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341341
}
342342
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343+
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
344+
std::io::Error::new(
345+
std::io::ErrorKind::Unsupported,
346+
"Can't remove manifest from within Index server",
347+
),
348+
))),
343349
}
344350
}
345351

@@ -350,6 +356,7 @@ pub async fn get_first_event(
350356
) -> Result<Option<String>, ObjectStorageError> {
351357
let mut first_event_at: String = String::default();
352358
match PARSEABLE.options.mode {
359+
Mode::Index => unimplemented!(),
353360
Mode::All | Mode::Ingest => {
354361
// get current snapshot
355362
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();

Diff for: src/connectors/kafka/processor.rs

+17-37
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, sync::Arc};
19+
use std::sync::Arc;
2020

2121
use async_trait::async_trait;
22-
use chrono::Utc;
2322
use futures_util::StreamExt;
2423
use rdkafka::consumer::{CommitMode, Consumer};
2524
use serde_json::Value;
@@ -58,37 +57,10 @@ impl ParseableSinkProcessor {
5857
let stream = PARSEABLE.get_stream(stream_name)?;
5958
let schema = stream.get_schema_raw();
6059
let time_partition = stream.get_time_partition();
60+
let custom_partition = stream.get_custom_partition();
6161
let static_schema_flag = stream.get_static_schema_flag();
6262
let schema_version = stream.get_schema_version();
6363

64-
let (json_vec, total_payload_size) = Self::json_vec(records);
65-
let batch_json_event = json::Event {
66-
data: Value::Array(json_vec),
67-
};
68-
69-
let (rb, is_first) = batch_json_event.into_recordbatch(
70-
&schema,
71-
static_schema_flag,
72-
time_partition.as_ref(),
73-
schema_version,
74-
)?;
75-
76-
let p_event = ParseableEvent {
77-
rb,
78-
stream_name: stream_name.to_string(),
79-
origin_format: "json",
80-
origin_size: total_payload_size,
81-
is_first_event: is_first,
82-
parsed_timestamp: Utc::now().naive_utc(),
83-
time_partition: None,
84-
custom_partition_values: HashMap::new(),
85-
stream_type: StreamType::UserDefined,
86-
};
87-
88-
Ok(p_event)
89-
}
90-
91-
fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
9264
let mut json_vec = Vec::with_capacity(records.len());
9365
let mut total_payload_size = 0u64;
9466

@@ -99,22 +71,30 @@ impl ParseableSinkProcessor {
9971
}
10072
}
10173

102-
(json_vec, total_payload_size)
74+
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
75+
stream_name.to_string(),
76+
total_payload_size,
77+
&schema,
78+
static_schema_flag,
79+
custom_partition.as_ref(),
80+
time_partition.as_ref(),
81+
schema_version,
82+
StreamType::UserDefined,
83+
)?;
84+
85+
Ok(p_event)
10386
}
10487
}
10588

10689
#[async_trait]
10790
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
10891
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
10992
let len = records.len();
110-
debug!("Processing {} records", len);
93+
debug!("Processing {len} records");
11194

112-
self.build_event_from_chunk(&records)
113-
.await?
114-
.process()
115-
.await?;
95+
self.build_event_from_chunk(&records).await?.process()?;
11696

117-
debug!("Processed {} records", len);
97+
debug!("Processed {len} records");
11898
Ok(())
11999
}
120100
}

Diff for: src/enterprise/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod utils;

Diff for: src/enterprise/utils.rs

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::{collections::HashMap, path::PathBuf, sync::Arc};
2+
3+
use datafusion::{common::Column, prelude::Expr};
4+
use itertools::Itertools;
5+
use relative_path::RelativePathBuf;
6+
7+
use crate::query::stream_schema_provider::extract_primary_filter;
8+
use crate::{
9+
catalog::{
10+
manifest::{File, Manifest},
11+
snapshot, Snapshot,
12+
},
13+
event,
14+
parseable::PARSEABLE,
15+
query::{stream_schema_provider::ManifestExt, PartialTimeFilter},
16+
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
17+
utils::time::TimeRange,
18+
};
19+
20+
pub fn create_time_filter(
21+
time_range: &TimeRange,
22+
time_partition: Option<String>,
23+
table_name: &str,
24+
) -> Vec<Expr> {
25+
let mut new_filters = vec![];
26+
let start_time = time_range.start.naive_utc();
27+
let end_time = time_range.end.naive_utc();
28+
let mut _start_time_filter: Expr;
29+
let mut _end_time_filter: Expr;
30+
31+
match time_partition {
32+
Some(time_partition) => {
33+
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
34+
.binary_expr(Expr::Column(Column::new(
35+
Some(table_name.to_owned()),
36+
time_partition.clone(),
37+
)));
38+
_end_time_filter =
39+
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(
40+
Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)),
41+
);
42+
}
43+
None => {
44+
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
45+
.binary_expr(Expr::Column(Column::new(
46+
Some(table_name.to_owned()),
47+
event::DEFAULT_TIMESTAMP_KEY,
48+
)));
49+
_end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
50+
.binary_expr(Expr::Column(Column::new(
51+
Some(table_name.to_owned()),
52+
event::DEFAULT_TIMESTAMP_KEY,
53+
)));
54+
}
55+
}
56+
57+
new_filters.push(_start_time_filter);
58+
new_filters.push(_end_time_filter);
59+
60+
new_filters
61+
}
62+
63+
pub async fn fetch_parquet_file_paths(
64+
stream: &str,
65+
time_range: &TimeRange,
66+
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
67+
let glob_storage = PARSEABLE.storage.get_object_store();
68+
69+
let object_store_format = glob_storage.get_object_store_format(stream).await?;
70+
71+
let time_partition = object_store_format.time_partition;
72+
73+
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);
74+
75+
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);
76+
77+
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();
78+
79+
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
80+
let obs = glob_storage
81+
.get_objects(
82+
Some(&path),
83+
Box::new(|file_name| file_name.ends_with("stream.json")),
84+
)
85+
.await;
86+
if let Ok(obs) = obs {
87+
for ob in obs {
88+
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
89+
let snapshot = object_store_format.snapshot;
90+
for manifest in snapshot.manifest_list {
91+
merged_snapshot.manifest_list.push(manifest);
92+
}
93+
}
94+
}
95+
}
96+
97+
let manifest_files = collect_manifest_files(
98+
glob_storage,
99+
merged_snapshot
100+
.manifests(&time_filters)
101+
.into_iter()
102+
.sorted_by_key(|file| file.time_lower_bound)
103+
.map(|item| item.manifest_path)
104+
.collect(),
105+
)
106+
.await?;
107+
108+
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();
109+
110+
let mut selected_files = manifest_files
111+
.into_iter()
112+
.flat_map(|file| file.files)
113+
.rev()
114+
.collect_vec();
115+
116+
for filter in time_filter_expr {
117+
selected_files.retain(|file| !file.can_be_pruned(&filter))
118+
}
119+
120+
selected_files
121+
.into_iter()
122+
.map(|file| {
123+
let date = file.file_path.split("/").collect_vec();
124+
125+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
126+
127+
let date = RelativePathBuf::from_iter(date);
128+
129+
parquet_files.entry(date).or_default().push(file);
130+
})
131+
.for_each(|_| {});
132+
133+
Ok(parquet_files)
134+
}
135+
136+
async fn collect_manifest_files(
137+
storage: Arc<dyn ObjectStorage>,
138+
manifest_urls: Vec<String>,
139+
) -> Result<Vec<Manifest>, ObjectStorageError> {
140+
let mut tasks = Vec::new();
141+
manifest_urls.into_iter().for_each(|path| {
142+
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
143+
let storage = Arc::clone(&storage);
144+
tasks.push(tokio::task::spawn(async move {
145+
storage.get_object(&path).await
146+
}));
147+
});
148+
149+
let mut op = Vec::new();
150+
for task in tasks {
151+
let file = task.await??;
152+
op.push(file);
153+
}
154+
155+
Ok(op
156+
.into_iter()
157+
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
158+
.collect())
159+
}

0 commit comments

Comments
 (0)