Skip to content

Commit 0b7c297

Browse files
committed
fix review comments
1 parent 1ac0b04 commit 0b7c297

File tree

6 files changed

+91
-32
lines changed

6 files changed

+91
-32
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ build_exceptions! {
511511
IllegalUser(2218),
512512
}
513513

514-
// Database and Catalog Management Errors [2301-2317, 2321-2326]
514+
// Database and Catalog Management Errors [2301-2317, 2321-2327]
515515
build_exceptions! {
516516
/// Database already exists
517517
DatabaseAlreadyExists(2301),
@@ -551,6 +551,8 @@ build_exceptions! {
551551
RowAccessPolicyAlreadyExists(2324),
552552
/// General failures met while garbage collecting database meta
553553
GeneralDbGcFailure(2325),
554+
/// Table snapshot expired
555+
TableSnapshotExpired(2327),
554556
}
555557

556558
// Stage and Connection Errors [2501-2505, 2510-2512]

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2974,23 +2974,48 @@ impl SchemaApiTestSuite {
29742974
{
29752975
let table = util.get_table().await.unwrap();
29762976
let table_id = table.ident.table_id;
2977-
let lvt_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
2977+
let small_ts = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
29782978
let lvt_time = DateTime::<Utc>::from_timestamp(2_000, 0).unwrap();
2979+
let big_time = DateTime::<Utc>::from_timestamp(3_000, 0).unwrap();
2980+
2981+
// LVT no set.
2982+
let mut new_table_meta = table.meta.clone();
2983+
new_table_meta.comment = "lvt no set".to_string();
2984+
let req = UpdateTableMetaReq {
2985+
table_id,
2986+
seq: MatchSeq::Exact(table.ident.seq),
2987+
new_table_meta: new_table_meta.clone(),
2988+
base_snapshot_location: None,
2989+
lvt_check: Some(TableLvtCheck {
2990+
tenant: tenant.clone(),
2991+
time: small_ts,
2992+
}),
2993+
};
2994+
let result = mt
2995+
.update_multi_table_meta(UpdateMultiTableMetaReq {
2996+
update_table_metas: vec![(req, table.as_ref().clone())],
2997+
..Default::default()
2998+
})
2999+
.await;
3000+
assert!(result.is_ok());
3001+
let table = util.get_table().await.unwrap();
3002+
assert_eq!(table.meta.comment, "lvt no set");
3003+
3004+
let lvt_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
29793005
mt.set_table_lvt(&lvt_ident, &LeastVisibleTime::new(lvt_time))
29803006
.await?;
29813007

2982-
// LVT was changed.
3008+
// LVT is smaller.
29833009
let mut new_table_meta = table.meta.clone();
29843010
new_table_meta.comment = "lvt guard should fail".to_string();
2985-
let small_ts = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
29863011
let req = UpdateTableMetaReq {
29873012
table_id,
29883013
seq: MatchSeq::Exact(table.ident.seq),
29893014
new_table_meta: new_table_meta.clone(),
29903015
base_snapshot_location: None,
29913016
lvt_check: Some(TableLvtCheck {
29923017
tenant: tenant.clone(),
2993-
lvt: LeastVisibleTime::new(small_ts),
3018+
time: small_ts,
29943019
}),
29953020
};
29963021
let result = mt
@@ -3001,7 +3026,7 @@ impl SchemaApiTestSuite {
30013026
.await;
30023027
assert!(result.is_err());
30033028

3004-
// LVT unchanged.
3029+
// LVT is large enough.
30053030
let table = util.get_table().await.unwrap();
30063031
let mut ok_table_meta = table.meta.clone();
30073032
ok_table_meta.comment = "lvt guard success".to_string();
@@ -3012,7 +3037,7 @@ impl SchemaApiTestSuite {
30123037
base_snapshot_location: None,
30133038
lvt_check: Some(TableLvtCheck {
30143039
tenant: tenant.clone(),
3015-
lvt: LeastVisibleTime::new(lvt_time),
3040+
time: big_time,
30163041
}),
30173042
};
30183043
mt.update_multi_table_meta(UpdateMultiTableMetaReq {

src/meta/api/src/table_api.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
3232
use databend_common_meta_app::app_error::StreamAlreadyExists;
3333
use databend_common_meta_app::app_error::StreamVersionMismatched;
3434
use databend_common_meta_app::app_error::TableAlreadyExists;
35+
use databend_common_meta_app::app_error::TableSnapshotExpired;
3536
use databend_common_meta_app::app_error::TableVersionMismatched;
3637
use databend_common_meta_app::app_error::UndropTableHasNoHistory;
3738
use databend_common_meta_app::app_error::UndropTableWithNoDropTime;
@@ -99,7 +100,6 @@ use databend_common_meta_types::MatchSeqExt;
99100
use databend_common_meta_types::MetaError;
100101
use databend_common_meta_types::MetaId;
101102
use databend_common_meta_types::SeqV;
102-
use databend_common_meta_types::TxnCondition;
103103
use databend_common_meta_types::TxnGetRequest;
104104
use databend_common_meta_types::TxnGetResponse;
105105
use databend_common_meta_types::TxnOp;
@@ -1244,10 +1244,26 @@ where
12441244
// Add LVT check if provided
12451245
if let Some(check) = req.lvt_check.as_ref() {
12461246
let lvt_ident = LeastVisibleTimeIdent::new(&check.tenant, req.table_id);
1247-
txn.condition.push(TxnCondition::eq_value(
1248-
lvt_ident.to_string_key(),
1249-
serialize_struct(&check.lvt)?,
1250-
));
1247+
let res = self.get_pb(&lvt_ident).await?;
1248+
let (seq, current_lvt) = match res {
1249+
Some(v) => (v.seq, Some(v.data)),
1250+
None => (0, None),
1251+
};
1252+
if let Some(current_lvt) = current_lvt {
1253+
if current_lvt.time > check.time {
1254+
return Err(KVAppError::AppError(AppError::TableSnapshotExpired(
1255+
TableSnapshotExpired::new(
1256+
req.table_id,
1257+
format!(
1258+
"snapshot timestamp {:?} is older than the table's least visible time {:?}",
1259+
check.time, current_lvt.time
1260+
),
1261+
),
1262+
)));
1263+
}
1264+
}
1265+
// no other one has updated LVT since we read it
1266+
txn.condition.push(txn_cond_seq(&lvt_ident, Eq, seq));
12511267
}
12521268

12531269
txn.if_then

src/meta/app/src/app_error.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,22 @@ impl UnknownTableId {
584584
}
585585
}
586586

587+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
588+
#[error("TableSnapshotExpired: {table_id} while {context}")]
589+
pub struct TableSnapshotExpired {
590+
table_id: u64,
591+
context: String,
592+
}
593+
594+
impl TableSnapshotExpired {
595+
pub fn new(table_id: u64, context: impl Into<String>) -> Self {
596+
Self {
597+
table_id,
598+
context: context.into(),
599+
}
600+
}
601+
}
602+
587603
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
588604
#[error(
589605
"VirtualColumnIdOutBound: the virtual column id `{column_id}` is outside the range `{lower}` to `{upper}`"
@@ -1080,6 +1096,9 @@ pub enum AppError {
10801096
#[error(transparent)]
10811097
UnknownTableId(#[from] UnknownTableId),
10821098

1099+
#[error(transparent)]
1100+
TableSnapshotExpired(#[from] TableSnapshotExpired),
1101+
10831102
#[error(transparent)]
10841103
TxnRetryMaxTimes(#[from] TxnRetryMaxTimes),
10851104

@@ -1293,6 +1312,8 @@ impl AppErrorMessage for UnknownTable {
12931312

12941313
impl AppErrorMessage for UnknownTableId {}
12951314

1315+
impl AppErrorMessage for TableSnapshotExpired {}
1316+
12961317
impl AppErrorMessage for UnknownDatabaseId {}
12971318

12981319
impl AppErrorMessage for TableVersionMismatched {}
@@ -1623,6 +1644,7 @@ impl From<AppError> for ErrorCode {
16231644
AppError::UnknownDatabase(err) => ErrorCode::UnknownDatabase(err.message()),
16241645
AppError::UnknownDatabaseId(err) => ErrorCode::UnknownDatabaseId(err.message()),
16251646
AppError::UnknownTableId(err) => ErrorCode::UnknownTableId(err.message()),
1647+
AppError::TableSnapshotExpired(err) => ErrorCode::TableSnapshotExpired(err.message()),
16261648
AppError::UnknownTable(err) => ErrorCode::UnknownTable(err.message()),
16271649
AppError::UnknownCatalog(err) => ErrorCode::UnknownCatalog(err.message()),
16281650
AppError::DatabaseAlreadyExists(err) => ErrorCode::DatabaseAlreadyExists(err.message()),

src/meta/app/src/schema/table/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use super::CatalogInfo;
4141
use super::CreateOption;
4242
use super::DatabaseId;
4343
use super::MarkedDeletedIndexMeta;
44-
use crate::schema::LeastVisibleTime;
4544
use crate::schema::constraint::Constraint;
4645
use crate::schema::database_name_ident::DatabaseNameIdent;
4746
use crate::schema::table_niv::TableNIV;
@@ -817,14 +816,13 @@ pub struct UpdateTableMetaReq {
817816
pub new_table_meta: TableMeta,
818817
pub base_snapshot_location: Option<String>,
819818
/// Optional optimistic LVT check.
820-
/// When set, the table LVT must be equal to the provided value.
821819
pub lvt_check: Option<TableLvtCheck>,
822820
}
823821

824822
#[derive(Clone, Debug, PartialEq, Eq)]
825823
pub struct TableLvtCheck {
826824
pub tenant: Tenant,
827-
pub lvt: LeastVisibleTime,
825+
pub time: DateTime<Utc>,
828826
}
829827

830828
#[derive(Clone, Debug, PartialEq, Eq)]

src/query/ee/src/table_ref/handler.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use databend_common_exception::Result;
2323
use databend_common_meta_app::schema::SnapshotRef;
2424
use databend_common_meta_app::schema::TableLvtCheck;
2525
use databend_common_meta_app::schema::UpdateTableMetaReq;
26-
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
2726
use databend_common_meta_types::MatchSeq;
2827
use databend_common_sql::plans::CreateTableRefPlan;
2928
use databend_common_sql::plans::DropTableRefPlan;
@@ -95,6 +94,14 @@ impl TableRefHandler for RealTableRefHandler {
9594
.read_table_snapshot_with_location(snapshot_loc)
9695
.await?
9796
{
97+
if snapshot.timestamp.is_none() {
98+
return Err(ErrorCode::IllegalReference(format!(
99+
"Table {} snapshot lacks required timestamp. This table was created with a significantly outdated version \
100+
that is no longer directly supported by the current version and requires migration. \
101+
Please contact us at https://www.databend.com/contact-us/ or email [email protected]",
102+
table_id
103+
)));
104+
}
98105
let mut new_snapshot = TableSnapshot::try_from_previous(
99106
snapshot.clone(),
100107
Some(seq),
@@ -122,22 +129,11 @@ impl TableRefHandler for RealTableRefHandler {
122129
.as_ref()
123130
.is_some_and(|v| !matches!(v, NavigationPoint::TableRef { .. }))
124131
{
125-
if let Some(ts) = prev_ts {
126-
let current_lvt = catalog
127-
.get_table_lvt(&LeastVisibleTimeIdent::new(&tenant, table_id))
128-
.await?;
129-
if let Some(lvt) = current_lvt {
130-
if ts < lvt.time {
131-
return Err(ErrorCode::IllegalReference(format!(
132-
"Cannot create {} '{}', because the referred snapshot timestamp {} is older than table least visible time {}",
133-
plan.ref_type, plan.ref_name, ts, lvt.time,
134-
)));
135-
}
136-
lvt_check = Some(TableLvtCheck {
137-
tenant: tenant.clone(),
138-
lvt,
139-
});
140-
}
132+
if let Some(time) = prev_ts {
133+
lvt_check = Some(TableLvtCheck {
134+
tenant: tenant.clone(),
135+
time,
136+
});
141137
}
142138
}
143139

0 commit comments

Comments
 (0)