Skip to content

Commit 8091248

Browse files
emmaling27Convex, Inc.
authored andcommitted
Add _schema_validation_progress system table (#40312)
Adds a new system table, `_schema_validation_progress`, for each component. We're making this a separate table from the `_schemas` table in each component to avoid conflicts, since almost all writes depend on the `_schemas` table and the progress will updated pretty frequently during schema validation. Progress isn't 100% accurate because the document counts might be stale, but this is pretty much the same way we track index backfill progress. GitOrigin-RevId: a4fe10c47d04e329352b48da7eab424c06ec2380
1 parent 08139ef commit 8091248

File tree

4 files changed

+122
-2
lines changed

4 files changed

+122
-2
lines changed

crates/migrations_model/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub type DatabaseVersion = i64;
3030
// migrations unless explicitly dropping support.
3131
// Add a user name next to the version when you make a change to highlight merge
3232
// conflicts.
33-
pub const DATABASE_VERSION: DatabaseVersion = 121; // nipunn
33+
pub const DATABASE_VERSION: DatabaseVersion = 122; // emma
3434

3535
pub struct MigrationExecutor<RT: Runtime> {
3636
pub db: Database<RT>,
@@ -76,6 +76,11 @@ impl<RT: Runtime> MigrationExecutor<RT> {
7676
.await?;
7777
MigrationCompletionCriterion::MigrationComplete(to_version)
7878
},
79+
122 => {
80+
// This is an empty migration because we added a new system
81+
// table for each component, _schema_validation_progress
82+
MigrationCompletionCriterion::MigrationComplete(to_version)
83+
},
7984
// NOTE: Make sure to increase DATABASE_VERSION when adding new migrations.
8085
_ => anyhow::bail!("Version did not define a migration! {}", to_version),
8186
};

crates/model/src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,11 @@ use crate::{
199199
exports::ExportsTable,
200200
external_packages::EXTERNAL_PACKAGES_TABLE,
201201
log_sinks::LOG_SINKS_TABLE,
202+
schema_validation_progress::{
203+
SchemaValidationProgressTable,
204+
SCHEMA_VALIDATION_PROGRESS_BY_SCHEMA_ID,
205+
SCHEMA_VALIDATION_PROGRESS_TABLE,
206+
},
202207
};
203208

204209
pub mod airbyte_import;
@@ -222,6 +227,7 @@ mod metrics;
222227
pub mod migrations;
223228
pub mod modules;
224229
pub mod scheduled_jobs;
230+
pub mod schema_validation_progress;
225231
pub mod session_requests;
226232
pub mod snapshot_imports;
227233
pub mod source_packages;
@@ -265,9 +271,10 @@ enum DefaultTableNumber {
265271
CanonicalUrls = 34,
266272
CronNextRun = 35,
267273
IndexBackfills = 36,
274+
SchemaValidationProgress = 37,
268275
// Keep this number and your user name up to date. The number makes it easy to know
269276
// what to use next. The username on the same line detects merge conflicts
270-
// Next Number - 37 - emma
277+
// Next Number - 38 - emma
271278
}
272279

273280
impl From<DefaultTableNumber> for TableNumber {
@@ -310,6 +317,7 @@ impl From<DefaultTableNumber> for &'static dyn ErasedSystemTable {
310317
DefaultTableNumber::CanonicalUrls => &CanonicalUrlsTable,
311318
DefaultTableNumber::CronNextRun => &CronNextRunTable,
312319
DefaultTableNumber::IndexBackfills => &IndexBackfillTable,
320+
DefaultTableNumber::SchemaValidationProgress => &SchemaValidationProgressTable,
313321
}
314322
}
315323
}
@@ -557,6 +565,7 @@ pub fn component_system_tables() -> Vec<&'static dyn ErasedSystemTable> {
557565
&ModulesTable,
558566
&UdfConfigTable,
559567
&SourcePackagesTable,
568+
&SchemaValidationProgressTable,
560569
]
561570
}
562571

@@ -626,6 +635,7 @@ pub static FIRST_SEEN_TABLE: LazyLock<BTreeMap<TableName, DatabaseVersion>> = La
626635
FUNCTION_HANDLES_TABLE.clone() => 102,
627636
CANONICAL_URLS_TABLE.clone() => 116,
628637
INDEX_BACKFILLS_TABLE.clone() => 120,
638+
SCHEMA_VALIDATION_PROGRESS_TABLE.clone() => 122,
629639
}
630640
});
631641

@@ -652,6 +662,7 @@ pub static FIRST_SEEN_INDEX: LazyLock<BTreeMap<IndexName, DatabaseVersion>> = La
652662
BY_COMPONENT_PATH_INDEX.name() => 102,
653663
EXPORTS_BY_REQUESTOR.name() => 110,
654664
INDEX_BACKFILLS_BY_INDEX_ID.name() => 120,
665+
SCHEMA_VALIDATION_PROGRESS_BY_SCHEMA_ID.name() => 122,
655666
}
656667
});
657668

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
pub mod types;
2+
3+
use std::sync::LazyLock;
4+
5+
use common::document::CREATION_TIME_FIELD_PATH;
6+
use database::system_tables::{
7+
SystemIndex,
8+
SystemTable,
9+
};
10+
use value::{
11+
FieldPath,
12+
TableName,
13+
};
14+
15+
pub static SCHEMA_VALIDATION_PROGRESS_TABLE: LazyLock<TableName> = LazyLock::new(|| {
16+
"_schema_validation_progress"
17+
.parse()
18+
.expect("Invalid built-in _schema_validation_progress table")
19+
});
20+
21+
pub static SCHEMA_VALIDATION_PROGRESS_BY_SCHEMA_ID: LazyLock<
22+
SystemIndex<SchemaValidationProgressTable>,
23+
> = LazyLock::new(|| {
24+
SystemIndex::new(
25+
"by_schema_id",
26+
[&SCHEMA_ID_FIELD, &CREATION_TIME_FIELD_PATH],
27+
)
28+
.unwrap()
29+
});
30+
31+
static SCHEMA_ID_FIELD: LazyLock<FieldPath> =
32+
LazyLock::new(|| "schemaId".parse().expect("invalid schemaId field"));
33+
34+
pub struct SchemaValidationProgressTable;
35+
36+
impl SystemTable for SchemaValidationProgressTable {
37+
type Metadata = types::SchemaValidationProgressMetadata;
38+
39+
fn table_name() -> &'static TableName {
40+
&SCHEMA_VALIDATION_PROGRESS_TABLE
41+
}
42+
43+
fn indexes() -> Vec<SystemIndex<Self>> {
44+
vec![SCHEMA_VALIDATION_PROGRESS_BY_SCHEMA_ID.clone()]
45+
}
46+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use serde::{
2+
Deserialize,
3+
Serialize,
4+
};
5+
use value::{
6+
codegen_convex_serialization,
7+
DeveloperDocumentId,
8+
};
9+
10+
#[derive(Clone, Debug, Eq, PartialEq)]
11+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
12+
pub struct SchemaValidationProgressMetadata {
13+
/// The ID of the schema being validated. Should correspond to a document in
14+
/// the _schemas table in `Pending` state.
15+
pub schema_id: DeveloperDocumentId,
16+
/// The number of documents that have been validated so far.
17+
pub num_docs_validated: u64,
18+
/// The number of total documents that need to be validated. Note this is
19+
/// approximate because there could be changes since the time we wrote this
20+
/// value from the table summary when the schema is submitted as pending.
21+
/// It's possible for num_docs_validated to exceed total_docs.
22+
pub total_docs: u64,
23+
}
24+
25+
#[derive(Serialize, Deserialize)]
26+
#[serde(rename_all = "camelCase")]
27+
pub struct SerializedSchemaValidationProgressMetadata {
28+
pub schema_id: String,
29+
pub num_docs_validated: i64,
30+
pub total_docs: i64,
31+
}
32+
33+
impl From<SchemaValidationProgressMetadata> for SerializedSchemaValidationProgressMetadata {
34+
fn from(metadata: SchemaValidationProgressMetadata) -> Self {
35+
SerializedSchemaValidationProgressMetadata {
36+
schema_id: metadata.schema_id.to_string(),
37+
num_docs_validated: metadata.num_docs_validated as i64,
38+
total_docs: metadata.total_docs as i64,
39+
}
40+
}
41+
}
42+
43+
impl TryFrom<SerializedSchemaValidationProgressMetadata> for SchemaValidationProgressMetadata {
44+
type Error = anyhow::Error;
45+
46+
fn try_from(serialized: SerializedSchemaValidationProgressMetadata) -> anyhow::Result<Self> {
47+
Ok(SchemaValidationProgressMetadata {
48+
schema_id: serialized.schema_id.parse()?,
49+
num_docs_validated: serialized.num_docs_validated as u64,
50+
total_docs: serialized.total_docs as u64,
51+
})
52+
}
53+
}
54+
55+
codegen_convex_serialization!(
56+
SchemaValidationProgressMetadata,
57+
SerializedSchemaValidationProgressMetadata
58+
);

0 commit comments

Comments
 (0)