Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,33 @@ impl UnopenedPersistCatalogState {
}
}

// If we are planning to write to the catalog shards, we need to ensure they have schemas
// registered _before_ opening the write handles.
let diagnostics = Diagnostics {
shard_name: CATALOG_SHARD_NAME.to_string(),
handle_purpose: "durable catalog register schemas".to_string(),
};
persist_client
.register_schema::<SourceData, (), Timestamp, StorageDiff>(
catalog_shard_id,
&desc(),
&UnitSchema,
diagnostics.clone(),
)
.await
.expect("valid usage")
.expect("valid schema");
persist_client
.register_schema::<(), (), Timestamp, StorageDiff>(
upgrade_shard_id,
&UnitSchema,
&UnitSchema,
diagnostics.clone(),
)
.await
.expect("valid usage")
.expect("valid schema");

let open_handles_start = Instant::now();
info!("startup: envd serve: catalog init: open handles beginning");
let since_handle = persist_client
Expand Down
14 changes: 13 additions & 1 deletion src/durable-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ impl<C: DurableCacheCodec> DurableCache<C> {
shard_name: format!("{purpose}_cache"),
handle_purpose: format!("durable persist cache: {purpose}"),
};

let (key_schema, val_schema) = C::schemas();
persist
.register_schema::<C::KeyCodec, C::ValCodec, u64, i64>(
shard_id,
&key_schema,
&val_schema,
diagnostics.clone(),
)
.await
.expect("valid usage")
.expect("valid schema");

let since_handle = persist
.open_critical_since(
shard_id,
Expand All @@ -90,7 +103,6 @@ impl<C: DurableCacheCodec> DurableCache<C> {
)
.await
.expect("invalid usage");
let (key_schema, val_schema) = C::schemas();
let (mut write, read) = persist
.open(
shard_id,
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum InvalidUsage<T> {
CodecMismatch(Box<CodecMismatch>),
/// An invalid usage of [crate::batch::Batch::rewrite_ts].
InvalidRewrite(String),
/// Attempted to append to a shard without a registered write schema.
WriteWithoutSchema,
}

impl<T: Debug> std::fmt::Display for InvalidUsage<T> {
Expand Down Expand Up @@ -126,6 +128,9 @@ impl<T: Debug> std::fmt::Display for InvalidUsage<T> {
}
InvalidUsage::CodecMismatch(err) => std::fmt::Display::fmt(err, f),
InvalidUsage::InvalidRewrite(err) => write!(f, "invalid rewrite: {err}"),
InvalidUsage::WriteWithoutSchema => {
f.write_str("attempted write without a registered schema")
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/persist-client/src/internal/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ where
})
}

/// See [crate::PersistClient::find_schema].
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
self.state
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
// The common case is that the requested schema is a recent one, so as a minor
// optimization, do this search in reverse order.
let mut schemas = state.collections.schemas.iter().rev();
schemas
.find(|(_, x)| {
K::decode_schema(&x.key) == *key_schema
&& V::decode_schema(&x.val) == *val_schema
})
.map(|(id, _)| *id)
})
}

/// Returns whether the current's state `since` and `upper` are both empty.
///
/// Due to sharing state with other handles, successive reads to this fn or any other may
Expand Down
2 changes: 0 additions & 2 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg};
/// A key and value `Schema` of data written to a batch or shard.
#[derive(Debug)]
pub struct Schemas<K: Codec, V: Codec> {
// TODO: Remove the Option once this finishes rolling out and all shards
// have a registered schema.
/// Id under which this schema is registered in the shard's schema registry,
/// if any.
pub id: Option<SchemaId>,
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ where
self.applier.latest_schema()
}

/// See [crate::PersistClient::find_schema].
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
self.applier.find_schema(key_schema, val_schema)
}

/// See [crate::PersistClient::compare_and_evolve_schema].
///
/// TODO: Unify this with [Self::register_schema]?
Expand Down
11 changes: 7 additions & 4 deletions src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1576,15 +1576,18 @@ where
}
None => {
info!(
"register_schemas got {:?} expected {:?}",
"register_schemas got ({:?}, {:?}), expected {:?}",
key_schema,
val_schema,
self.schemas
.iter()
.map(|(id, x)| (id, K::decode_schema(&x.key)))
.map(|(id, x)| {
let key = K::decode_schema(&x.key);
let val = V::decode_schema(&x.val);
(id, key, val)
})
.collect::<Vec<_>>()
);
// Until we implement persist schema changes, only allow at most
// one registered schema.
Break(NoOpStateTransition(None))
}
}
Expand Down
81 changes: 57 additions & 24 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice;
use itertools::Itertools;
use mz_build_info::{BuildInfo, build_info};
use mz_dyncfg::ConfigSet;
use mz_ore::{instrument, soft_assert_or_log};
use mz_ore::instrument;
use mz_persist::location::{Blob, Consensus, ExternalError};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::{Codec, Codec64, Opaque};
Expand Down Expand Up @@ -490,10 +490,6 @@ impl PersistClient {
///
/// Use this to save latency and a bit of persist traffic if you're just
/// going to immediately drop or expire the [ReadHandle].
///
/// The `_schema` parameter is currently unused, but should be an object
/// that represents the schema of the data in the shard. This will be required
/// in the future.
#[instrument(level = "debug", fields(shard = %shard_id))]
pub async fn open_writer<K, V, T, D>(
&self,
Expand All @@ -510,25 +506,7 @@ impl PersistClient {
{
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));

// TODO: Because schemas are ordered, as part of the persist schema
// changes work, we probably want to build some way to allow persist
// users to control the order. For example, maybe a
// `PersistClient::compare_and_append_schema(current_schema_id,
// next_schema)`. Presumably this would then be passed in to open_writer
// instead of us implicitly registering it here.
// NB: The overwhelming common case is that this schema is already
// registered. In this case, the cmd breaks early and nothing is
// written to (or read from) CRDB.
let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
maintenance.start_performing(&machine, &gc);
soft_assert_or_log!(
schema_id.is_some(),
"unable to register schemas {:?} {:?}",
key_schema,
val_schema,
);

let schema_id = machine.find_schema(&*key_schema, &*val_schema);
let writer_id = WriterId::new();
let schemas = Schemas {
id: schema_id,
Expand Down Expand Up @@ -718,6 +696,61 @@ impl PersistClient {
Ok(machine.latest_schema())
}

/// Returns the ID of the given schema, if known at the current state.
pub async fn find_schema<K, V, T, D>(
&self,
shard_id: ShardId,
key_schema: &K::Schema,
val_schema: &V::Schema,
diagnostics: Diagnostics,
) -> Result<Option<SchemaId>, InvalidUsage<T>>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64 + Send + Sync,
{
let machine = self
.make_machine::<K, V, T, D>(shard_id, diagnostics)
.await?;
Ok(machine.find_schema(key_schema, val_schema))
}

/// Registers a schema for the given shard.
///
/// Returns the given schema's ID if the registration succeeds, and `None`
/// otherwise. Schema registration succeeds in two cases:
/// a) No schema was currently registered for the shard.
/// b) The given schema is already registered for the shard.
///
/// To evolve an existing schema, use
/// [PersistClient::compare_and_evolve_schema].
//
// TODO: unify with `compare_and_evolve_schema`
pub async fn register_schema<K, V, T, D>(
&self,
shard_id: ShardId,
key_schema: &K::Schema,
val_schema: &V::Schema,
diagnostics: Diagnostics,
) -> Result<Option<SchemaId>, InvalidUsage<T>>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64 + Send + Sync,
{
let machine = self
.make_machine::<K, V, T, D>(shard_id, diagnostics)
.await?;
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));

let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
maintenance.start_performing(&machine, &gc);

Ok(schema_id)
}

/// Registers a new latest schema for the given shard.
///
/// This new schema must be [backward_compatible] with all previous schemas
Expand Down
6 changes: 6 additions & 0 deletions src/persist-client/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ mod tests {
let schema0 = StringsSchema(vec![false]);
let schema1 = StringsSchema(vec![false, true]);

client
.register_schema::<Strings, (), u64, i64>(shard_id, &schema0, &UnitSchema, d.clone())
.await
.unwrap()
.unwrap();

let write0 = client
.open_writer::<Strings, (), u64, i64>(
shard_id,
Expand Down
6 changes: 6 additions & 0 deletions src/persist-client/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,12 @@ where
}
}

// If we are writing any data, we require that the write schema has previously been
// registered for the shard.
if batches.iter().any(|b| b.batch.len > 0) && self.schema_id().is_none() {
return Err(InvalidUsage::WriteWithoutSchema);
}

let lower = expected_upper.clone();
let upper = new_upper;
let since = Antichain::from_elem(T::minimum());
Expand Down
18 changes: 18 additions & 0 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,24 @@ where
// somewhere
debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);

// If we are planning to write to the shard, we need to ensure it has a schema
// registered _before_ opening the write handle.
if !this.read_only || migrated_storage_collections.contains(&id) {
persist_client
.register_schema::<SourceData, (), T, StorageDiff>(
metadata.data_shard,
&metadata.relation_desc,
&UnitSchema,
Diagnostics {
shard_name: id.to_string(),
handle_purpose: format!("controller register schema"),
},
)
.await
.expect("valid usage")
.expect("valid schema");
}

let (write, mut since_handle) = this
.open_data_handles(
&id,
Expand Down
Loading