-
Notifications
You must be signed in to change notification settings - Fork 63
Manifest Splitting #767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Manifest Splitting #767
Changes from 8 commits
de9ab79
14d9048
6aa7e29
70412ec
267c9cf
acf8fa5
2542412
d816f8b
a64252a
8ad20e8
ac42185
34da81b
d935131
d27a2a2
83fef67
b47788c
a56ccce
87af732
63fdc6d
d491c3e
fc965d7
b2167f7
dd95c32
fb89826
edb1d13
4732a61
5b70006
0ce517e
dd9b77d
fc19e7f
22deffe
2121f9a
56158c5
1a0215c
deb5a7e
d749f41
e648329
aa82355
5d2318b
10e3b7e
137f283
1119f81
f3db41b
ac6a0ef
2f42e71
ea50452
2ea8e2a
33966cb
80f8f5d
94bbf9e
6fc7eb6
a76558d
c35b589
f6156b9
48bebf2
0d7e01f
8c4cc59
872a522
77329bf
8d24d01
5d6cefa
5e34e6c
315d1df
9954cda
0b3581d
c1c7688
9e625cf
39125a8
b5812d5
02c95b6
8051f92
bf8936e
371c045
5a955f5
163d31c
0f44af8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,11 @@ use std::{ | |
| use async_trait::async_trait; | ||
| use chrono::{DateTime, Utc}; | ||
| pub use object_store::gcp::GcpCredential; | ||
| use regex::bytes::Regex; | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| use crate::{ | ||
| format::Path, | ||
| storage, | ||
| virtual_chunks::{ContainerName, VirtualChunkContainer, mk_default_containers}, | ||
| }; | ||
|
|
@@ -128,6 +130,82 @@ impl CachingConfig { | |
| } | ||
| } | ||
|
|
||
| #[derive(Debug, PartialEq, Eq, Serialize, Hash, Deserialize, Clone)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub enum ManifestShardCondition { | ||
| Or(Vec<ManifestShardCondition>), | ||
| And(Vec<ManifestShardCondition>), | ||
| PathMatches { regex: String }, | ||
| NameMatches { regex: String }, | ||
| } | ||
|
|
||
| //```yaml | ||
| //rules: | ||
| // - path: ./2m_temperature # regex, 3D variable: (null, latitude, longitude) | ||
| // manifest-split-sizes: | ||
| // - 0: 120 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make it more clear, could you give an example where we mix index based keys and coord names?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or maybe that's not possible for the same array?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be ok if it's not possible
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is possible: Python Example Rust Example Since Zarr allows |
||
| // - path: ./temperature # 4D variable: (time, level, latitude, longitude) | ||
| // manifest-split-sizes: | ||
| // - "level": 1 # alternatively 0: 1 | ||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // - "time": 12 # and 1: 12 | ||
| // - path: ./temperature | ||
| // manifest-split-sizes: | ||
| // - "level": 1 | ||
| // - "time": 8760 # ~1 year | ||
| // - "latitude": null # for unspecified, default is null, which means never split. | ||
| // - path: ./* # the default rules | ||
| // manifest-split-sizes: null # no splitting, just a single manifest per array | ||
| //``` | ||
|
|
||
| impl ManifestShardCondition { | ||
| // from_yaml? | ||
| pub fn matches(&self, path: &Path) -> bool { | ||
| match self { | ||
| ManifestShardCondition::Or(vec) => vec.iter().any(|c| c.matches(path)), | ||
| ManifestShardCondition::And(vec) => vec.iter().all(|c| c.matches(path)), | ||
| // TODO: precompile the regex | ||
| ManifestShardCondition::PathMatches { regex } => Regex::new(regex) | ||
| .map(|regex| regex.is_match(path.to_string().as_bytes())) | ||
| .unwrap_or(false), | ||
| // TODO: precompile the regex | ||
| ManifestShardCondition::NameMatches { regex } => Regex::new(regex) | ||
| .map(|regex| { | ||
| path.name() | ||
| .map(|name| regex.is_match(name.as_bytes())) | ||
| .unwrap_or(false) | ||
| }) | ||
| .unwrap_or(false), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // FIXME: isn't this really another condition? | ||
| #[derive(Debug, Hash, PartialEq, Eq, Serialize, Deserialize, Clone)] | ||
| pub enum ShardDimCondition { | ||
dcherian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Axis(usize), | ||
| DimensionName(String), | ||
| // TODO: Since dimension name can be null, | ||
| // i don't think we can have DimensionName(r"*") catch the "Any" case | ||
| Any, | ||
| } | ||
|
|
||
| #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] | ||
| pub struct ManifestShardingConfig { | ||
| // TODO: need to preserve insertion order of conditions, so hashmap doesn't work | ||
| pub shard_sizes: Vec<(ManifestShardCondition, Vec<(ShardDimCondition, u32)>)>, | ||
dcherian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| impl Default for ManifestShardingConfig { | ||
| fn default() -> Self { | ||
| let inner = vec![(ShardDimCondition::Any, u32::MAX)]; | ||
| let new = vec![( | ||
| ManifestShardCondition::PathMatches { regex: r".*".to_string() }, | ||
|
||
| inner, | ||
| )]; | ||
| Self { shard_sizes: new } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] | ||
| #[serde(rename_all = "snake_case")] | ||
| pub enum ManifestPreloadCondition { | ||
|
|
@@ -206,20 +284,33 @@ static DEFAULT_MANIFEST_PRELOAD_CONDITION: OnceLock<ManifestPreloadCondition> = | |
| #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] | ||
| pub struct ManifestConfig { | ||
| pub preload: Option<ManifestPreloadConfig>, | ||
| pub sharding: Option<ManifestShardingConfig>, | ||
| } | ||
|
|
||
| static DEFAULT_MANIFEST_PRELOAD_CONFIG: OnceLock<ManifestPreloadConfig> = OnceLock::new(); | ||
| static DEFAULT_MANIFEST_SHARDING_CONFIG: OnceLock<ManifestShardingConfig> = | ||
| OnceLock::new(); | ||
|
|
||
| impl ManifestConfig { | ||
| pub fn merge(&self, other: Self) -> Self { | ||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Self { preload: other.preload.or(self.preload.clone()) } | ||
| Self { | ||
| preload: other.preload.or(self.preload.clone()), | ||
| // FIXME: why prioritize one over the other? | ||
|
||
| sharding: other.sharding.or(self.sharding.clone()), | ||
dcherian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| pub fn preload(&self) -> &ManifestPreloadConfig { | ||
| self.preload.as_ref().unwrap_or_else(|| { | ||
| DEFAULT_MANIFEST_PRELOAD_CONFIG.get_or_init(ManifestPreloadConfig::default) | ||
| }) | ||
| } | ||
|
|
||
| pub fn sharding(&self) -> &ManifestShardingConfig { | ||
| self.sharding.as_ref().unwrap_or_else(|| { | ||
| DEFAULT_MANIFEST_SHARDING_CONFIG.get_or_init(ManifestShardingConfig::default) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ use crate::format::flatbuffers::generated; | |
| use bytes::Bytes; | ||
| use flatbuffers::VerifierOptions; | ||
| use futures::{Stream, TryStreamExt}; | ||
| use itertools::Itertools; | ||
| use itertools::{Itertools, multiunzip, repeat_n}; | ||
| use serde::{Deserialize, Serialize}; | ||
| use thiserror::Error; | ||
|
|
||
|
|
@@ -21,12 +21,6 @@ use super::{ | |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| pub struct ManifestExtents(Vec<Range<u32>>); | ||
|
|
||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| pub struct ManifestRef { | ||
| pub object_id: ManifestId, | ||
| pub extents: ManifestExtents, | ||
| } | ||
|
|
||
| impl ManifestExtents { | ||
| pub fn new(from: &[u32], to: &[u32]) -> Self { | ||
| let v = from | ||
|
|
@@ -37,9 +31,83 @@ impl ManifestExtents { | |
| Self(v) | ||
| } | ||
|
|
||
| pub fn contains(&self, coord: &[u32]) -> bool { | ||
| self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to start checking on writes that indexes have the proper size for the metadata |
||
| } | ||
|
|
||
| pub fn iter(&self) -> impl Iterator<Item = &Range<u32>> { | ||
| self.0.iter() | ||
| } | ||
|
|
||
| pub fn len(&self) -> usize { | ||
| self.0.len() | ||
| } | ||
|
|
||
| pub fn is_empty(&self) -> bool { | ||
| self.0.is_empty() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| pub struct ManifestRef { | ||
| pub object_id: ManifestId, | ||
| pub extents: ManifestExtents, | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct ManifestShards(Vec<ManifestExtents>); | ||
|
|
||
| impl ManifestShards { | ||
| pub fn default(ndim: usize) -> Self { | ||
|
||
| Self(vec![ManifestExtents(repeat_n(0..u32::MAX, ndim).collect())]) | ||
dcherian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| pub fn is_empty(&self) -> bool { | ||
| self.0.is_empty() | ||
| } | ||
| pub fn from_edges(iter: impl IntoIterator<Item = Vec<u32>>) -> Self { | ||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let res = iter | ||
| .into_iter() | ||
| .map(|x| x.into_iter().tuple_windows()) | ||
| .multi_cartesian_product() | ||
| .map(multiunzip) | ||
| .map(|(from, to): (Vec<u32>, Vec<u32>)| { | ||
| ManifestExtents::new(from.as_slice(), to.as_slice()) | ||
| }); | ||
| Self(res.collect()) | ||
| } | ||
|
|
||
| // Returns the index of shard_range that includes ChunkIndices | ||
| // This can be used at write time to split manifests based on the config | ||
| // and at read time to choose which manifest to query for chunk payload | ||
| pub fn which(&self, coord: &ChunkIndices) -> Result<usize, IcechunkFormatError> { | ||
| // shard_range[i] must bound ChunkIndices | ||
| // 0 <= return value <= shard_range.len() | ||
| // it is possible that shard_range does not include a coord. say we have 2x2 shard grid | ||
| // but only shard (0,0) and shard (1,1) are populated with data. | ||
| // A coord located in (1, 0) should return Err | ||
| // Since shard_range need not form a regular grid, we must iterate through and find the first result. | ||
| // ManifestExtents in shard_range MUST NOT overlap with each other. How do we ensure this? | ||
dcherian marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // ndim must be the same | ||
| // debug_assert_eq!(coord.0.len(), shard_range[0].len()); | ||
| // FIXME: could optimize for unbounded single manifest | ||
| self.iter() | ||
| .enumerate() | ||
| .find(|(_, e)| e.contains(coord.0.as_slice())) | ||
| .map(|(i, _)| i) | ||
| .ok_or(IcechunkFormatError::from( | ||
| IcechunkFormatErrorKind::InvalidIndexForSharding { | ||
| coords: coord.clone(), | ||
| }, | ||
| )) | ||
| } | ||
|
|
||
| pub fn iter(&self) -> impl Iterator<Item = &ManifestExtents> { | ||
| self.0.iter() | ||
| } | ||
|
|
||
| pub fn len(&self) -> usize { | ||
| self.0.len() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Error)] | ||
|
|
@@ -206,7 +274,7 @@ impl Manifest { | |
| } | ||
|
|
||
| if array_manifests.is_empty() { | ||
| // empty manifet | ||
| // empty manifest | ||
| return Ok(None); | ||
| } | ||
|
|
||
|
|
||

Uh oh!
There was an error while loading. Please reload this page.