Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2736,6 +2736,7 @@ impl Coordinator {
since: None,
status_collection_id,
timeline: Some(timeline.clone()),
primary: None,
}
};

Expand Down Expand Up @@ -2766,10 +2767,9 @@ impl Coordinator {
let next_version = version.bump();
let primary_collection =
versions.get(&next_version).map(|(gid, _desc)| gid).copied();
let collection_desc = CollectionDescription::for_table(
desc.clone(),
primary_collection,
);
let mut collection_desc =
CollectionDescription::for_table(desc.clone());
collection_desc.primary = primary_collection;

(*gid, collection_desc)
});
Expand Down Expand Up @@ -2808,13 +2808,8 @@ impl Coordinator {
compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
}
CatalogItem::ContinualTask(ct) => {
let collection_desc = CollectionDescription {
desc: ct.desc.clone(),
data_source: DataSource::Other,
since: ct.initial_as_of.clone(),
status_collection_id: None,
timeline: None,
};
let collection_desc =
CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
if ct.global_id().is_system() && collection_desc.since.is_none() {
// We need a non-0 since to make as_of selection work. Fill it in below with
// the `bootstrap_builtin_continual_tasks` call, which can only be run after
Expand Down Expand Up @@ -2859,6 +2854,7 @@ impl Coordinator {
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};
collections.push((sink.global_id, collection_desc));
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,7 @@ impl Coordinator {
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};
let collections = vec![(id, collection_desc)];

Expand Down
7 changes: 5 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ impl Coordinator {
timeline: Some(source.timeline),
since: None,
status_collection_id,
primary: None,
},
));
}
Expand Down Expand Up @@ -1229,8 +1230,7 @@ impl Coordinator {
.desc
.at_version(RelationVersionSelector::Specific(relation_version));
// We assert above we have a single version, and thus we are the primary.
let collection_desc =
CollectionDescription::for_table(relation_desc, None);
let collection_desc = CollectionDescription::for_table(relation_desc);
let collections = vec![(global_id, collection_desc)];

let compaction_window = table
Expand Down Expand Up @@ -1283,6 +1283,7 @@ impl Coordinator {
since: None,
status_collection_id,
timeline: Some(timeline.clone()),
primary: None,
};

let collections = vec![(global_id, collection_desc)];
Expand Down Expand Up @@ -1319,6 +1320,7 @@ impl Coordinator {
since: None,
status_collection_id: None,
timeline: Some(timeline.clone()),
primary: None,
};
let collections = vec![(global_id, collection_desc)];
let read_policies = coord
Expand Down Expand Up @@ -4492,6 +4494,7 @@ impl Coordinator {
since: None,
status_collection_id,
timeline: Some(source.timeline.clone()),
primary: None,
},
));

Expand Down
10 changes: 2 additions & 8 deletions src/adapter/src/coord/sequencer/inner/create_continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use mz_sql::plan;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql_parser::ast::Statement;
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::controller::{CollectionDescription, DataSource};
use mz_storage_client::controller::CollectionDescription;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::OptimizerNotice;

Expand Down Expand Up @@ -148,13 +148,7 @@ impl Coordinator {
None,
vec![(
global_id,
CollectionDescription {
desc,
data_source: DataSource::Other,
since: Some(as_of),
status_collection_id: None,
timeline: None,
},
CollectionDescription::for_other(desc, Some(as_of)),
)],
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use mz_sql::plan;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql_parser::ast;
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::controller::{CollectionDescription, DataSource};
use mz_storage_client::controller::CollectionDescription;
use std::collections::BTreeMap;
use timely::progress::Antichain;
use tracing::Span;
Expand Down Expand Up @@ -696,13 +696,7 @@ impl Coordinator {
None,
vec![(
global_id,
CollectionDescription {
desc: output_desc,
data_source: DataSource::Other,
since: Some(storage_as_of),
status_collection_id: None,
timeline: None,
},
CollectionDescription::for_other(output_desc, Some(storage_as_of)),
)],
)
.await
Expand Down
22 changes: 13 additions & 9 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,7 @@ pub enum DataSource<T> {
/// Data comes from external HTTP requests pushed to Materialize.
Webhook,
/// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
Table {
/// This table has had columns added or dropped to it, so we're now a
/// "view" over the "primary" Table/collection. Within the
/// `storage-controller` we the primary as a dependency.
primary: Option<GlobalId>,
},
Table,
/// This source's data does not need to be managed by the storage
/// controller, e.g. it's a materialized view or the catalog collection.
Other,
Expand All @@ -151,6 +146,13 @@ pub struct CollectionDescription<T> {
pub status_collection_id: Option<GlobalId>,
/// The timeline of the source. Absent for materialized views, continual tasks, etc.
pub timeline: Option<Timeline>,
/// The primary of this collections.
///
/// Multiple storage collections can point to the same persist shard,
/// possibly with different schemas. In such a configuration, we select one
/// of the involved collections as the primary, who "owns" the persist
/// shard. All other involved collections have a dependency on the primary.
pub primary: Option<GlobalId>,
}

impl<T> CollectionDescription<T> {
Expand All @@ -162,17 +164,19 @@ impl<T> CollectionDescription<T> {
since,
status_collection_id: None,
timeline: None,
primary: None,
}
}

/// Create a CollectionDescription for a table.
pub fn for_table(desc: RelationDesc, primary: Option<GlobalId>) -> Self {
pub fn for_table(desc: RelationDesc) -> Self {
Self {
desc,
data_source: DataSource::Table { primary },
data_source: DataSource::Table,
since: None,
status_collection_id: None,
timeline: Some(Timeline::EpochMilliseconds),
primary: None,
}
}
}
Expand Down Expand Up @@ -736,7 +740,7 @@ impl<T> DataSource<T> {
/// source using txn-wal.
pub fn in_txns(&self) -> bool {
match self {
DataSource::Table { .. } => true,
DataSource::Table => true,
DataSource::Other
| DataSource::Ingestion(_)
| DataSource::IngestionExport { .. }
Expand Down
71 changes: 30 additions & 41 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,24 +902,25 @@ where
Ok(())
}

/// Determine if this collection has another dependency.
///
/// Currently, collections have either 0 or 1 dependencies.
/// Returns the given collection's dependencies.
fn determine_collection_dependencies(
&self,
self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
source_id: GlobalId,
data_source: &DataSource<T>,
collection_desc: &CollectionDescription<T>,
) -> Result<Vec<GlobalId>, StorageError<T>> {
let dependencies = match &data_source {
let mut dependencies = Vec::new();

if let Some(id) = collection_desc.primary {
dependencies.push(id);
}

match &collection_desc.data_source {
DataSource::Introspection(_)
| DataSource::Webhook
| DataSource::Table { primary: None }
| DataSource::Table
| DataSource::Progress
| DataSource::Other => Vec::new(),
DataSource::Table {
primary: Some(primary),
} => vec![*primary],
| DataSource::Other => (),
DataSource::IngestionExport {
ingestion_id,
data_config,
Expand All @@ -935,20 +936,18 @@ where
};

match data_config.envelope {
SourceEnvelope::CdcV2 => Vec::new(),
_ => vec![ingestion.remap_collection_id],
SourceEnvelope::CdcV2 => (),
_ => dependencies.push(ingestion.remap_collection_id),
}
}
// Ingestions depend on their remap collection.
DataSource::Ingestion(ingestion) => {
if ingestion.remap_collection_id == source_id {
vec![]
} else {
vec![ingestion.remap_collection_id]
if ingestion.remap_collection_id != source_id {
dependencies.push(ingestion.remap_collection_id);
}
}
DataSource::Sink { desc } => vec![desc.sink.from],
};
DataSource::Sink { desc } => dependencies.push(desc.sink.from),
}

Ok(dependencies)
}
Expand Down Expand Up @@ -1342,12 +1341,9 @@ where
let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
for (key, (mut changes, frontier)) in collections_net {
if !changes.is_empty() {
// If the table has a "primary" collection, let that collection drive compaction.
// If the collection has a "primary" collection, let that primary drive compaction.
let collection = collections.get(&key).expect("must still exist");
let should_emit_persist_compaction = !matches!(
collection.description.data_source,
DataSource::Table { primary: Some(_) }
);
let should_emit_persist_compaction = collection.description.primary.is_none();

if frontier.is_empty() {
info!(id = %key, "removing collection state because the since advanced to []!");
Expand Down Expand Up @@ -1906,7 +1902,7 @@ where
| DataSource::Progress
| DataSource::Other => {}
DataSource::Sink { .. } => {}
DataSource::Table { .. } => {
DataSource::Table => {
let register_ts = register_ts.expect(
"caller should have provided a register_ts when creating a table",
);
Expand Down Expand Up @@ -1963,7 +1959,7 @@ where
Sink(GlobalId),
}
to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
DataSource::Table => DependencyOrder::Table(Reverse(*id)),
DataSource::Sink { .. } => DependencyOrder::Sink(*id),
_ => DependencyOrder::Collection(*id),
});
Expand All @@ -1977,11 +1973,8 @@ where
let data_shard_since = since_handle.since().clone();

// Determine if this collection has any dependencies.
let storage_dependencies = self.determine_collection_dependencies(
&*self_collections,
id,
&description.data_source,
)?;
let storage_dependencies =
self.determine_collection_dependencies(&*self_collections, id, &description)?;

// Determine the initial since of the collection.
let initial_since = match storage_dependencies
Expand Down Expand Up @@ -2088,7 +2081,7 @@ where

self_collections.insert(id, collection_state);
}
DataSource::Table { .. } => {
DataSource::Table => {
// See comment on self.initial_txn_upper on why we're doing
// this.
if is_in_txns(id, &metadata)
Expand Down Expand Up @@ -2265,7 +2258,7 @@ where
.ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;

// TODO(alter_table): Support changes to sources.
if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
if existing.description.data_source != DataSource::Table {
return Err(StorageError::IdentifierInvalid(existing_collection));
}

Expand Down Expand Up @@ -2352,15 +2345,11 @@ where
.expect("existing collection missing");

// A higher level should already be asserting this, but let's make sure.
assert!(matches!(
existing.description.data_source,
DataSource::Table { primary: None }
));
assert_eq!(existing.description.data_source, DataSource::Table);
assert_none!(existing.description.primary);

// The existing version of the table will depend on the new version.
existing.description.data_source = DataSource::Table {
primary: Some(new_collection),
};
existing.description.primary = Some(new_collection);
existing.storage_dependencies.push(new_collection);

// Copy over the frontiers from the previous version.
Expand All @@ -2378,8 +2367,8 @@ where
let mut changes = ChangeBatch::new();
changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));

// Note: The new collection is now the "primary collection" so we specify `None` here.
let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
// Note: The new collection is now the "primary collection".
let collection_desc = CollectionDescription::for_table(new_desc.clone());
let collection_meta = CollectionMetadata {
persist_location: self.persist_location.clone(),
relation_desc: collection_desc.desc.clone(),
Expand Down
Loading