Skip to content

Commit e466b11

Browse files
committed
table lvt
1 parent 6a463d9 commit e466b11

File tree

20 files changed

+578
-102
lines changed

20 files changed

+578
-102
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ build_exceptions! {
511511
IllegalUser(2218),
512512
}
513513

514-
// Database and Catalog Management Errors [2301-2317, 2321-2323]
514+
// Database and Catalog Management Errors [2301-2317, 2321-2327]
515515
build_exceptions! {
516516
/// Database already exists
517517
DatabaseAlreadyExists(2301),
@@ -551,6 +551,8 @@ build_exceptions! {
551551
RowAccessPolicyAlreadyExists(2324),
552552
/// General failures met while garbage collecting database meta
553553
GeneralDbGcFailure(2325),
554+
/// Table snapshot is expired
555+
TableSnapshotExpired(2327),
554556
}
555557

556558
// Stage and Connection Errors [2501-2505, 2510-2512]

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ use databend_common_meta_app::schema::SequenceIdent;
101101
use databend_common_meta_app::schema::SetSecurityPolicyAction;
102102
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
103103
use databend_common_meta_app::schema::SetTableRowAccessPolicyReq;
104+
use databend_common_meta_app::schema::SnapshotLvtCheck;
104105
use databend_common_meta_app::schema::SwapTableReq;
105106
use databend_common_meta_app::schema::TableCopiedFileInfo;
106107
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
@@ -2642,6 +2643,7 @@ impl SchemaApiTestSuite {
26422643
mt: &MT,
26432644
) -> anyhow::Result<()> {
26442645
let tenant_name = "tenant1";
2646+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
26452647
let db_name = "db1";
26462648
let tbl_name = "tb2";
26472649

@@ -2717,6 +2719,7 @@ impl SchemaApiTestSuite {
27172719
seq: MatchSeq::Exact(table_version),
27182720
new_table_meta: new_table_meta.clone(),
27192721
base_snapshot_location: None,
2722+
lvt_check: None,
27202723
};
27212724

27222725
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2742,6 +2745,7 @@ impl SchemaApiTestSuite {
27422745
seq: MatchSeq::Exact(table_version + 1),
27432746
new_table_meta: new_table_meta.clone(),
27442747
base_snapshot_location: None,
2748+
lvt_check: None,
27452749
};
27462750
let res = mt
27472751
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2768,6 +2772,7 @@ impl SchemaApiTestSuite {
27682772
seq: MatchSeq::Exact(table_version),
27692773
new_table_meta: new_table_meta.clone(),
27702774
base_snapshot_location: None,
2775+
lvt_check: None,
27712776
};
27722777
let res = mt
27732778
.update_multi_table_meta_with_sender(
@@ -2849,6 +2854,7 @@ impl SchemaApiTestSuite {
28492854
seq: MatchSeq::Exact(table_version),
28502855
new_table_meta: new_table_meta.clone(),
28512856
base_snapshot_location: None,
2857+
lvt_check: None,
28522858
};
28532859
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
28542860
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2899,6 +2905,7 @@ impl SchemaApiTestSuite {
28992905
seq: MatchSeq::Exact(table_version),
29002906
new_table_meta: new_table_meta.clone(),
29012907
base_snapshot_location: None,
2908+
lvt_check: None,
29022909
};
29032910
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
29042911
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2949,6 +2956,7 @@ impl SchemaApiTestSuite {
29492956
seq: MatchSeq::Exact(table_version),
29502957
new_table_meta: new_table_meta.clone(),
29512958
base_snapshot_location: None,
2959+
lvt_check: None,
29522960
};
29532961
let result = mt
29542962
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2961,6 +2969,67 @@ impl SchemaApiTestSuite {
29612969
let err = ErrorCode::from(err);
29622970
assert_eq!(ErrorCode::DUPLICATED_UPSERT_FILES, err.code());
29632971
}
2972+
2973+
info!("--- update table meta, snapshot_ts must respect LVT");
2974+
{
2975+
let table = util.get_table().await.unwrap();
2976+
let table_id = table.ident.table_id;
2977+
let lvt_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
2978+
let lvt_time = DateTime::<Utc>::from_timestamp(2_000, 0).unwrap();
2979+
mt.set_table_lvt(&lvt_ident, &LeastVisibleTime::new(lvt_time))
2980+
.await?;
2981+
2982+
// Snapshot older than LVT should be rejected.
2983+
let mut new_table_meta = table.meta.clone();
2984+
new_table_meta.comment = "lvt guard should fail".to_string();
2985+
let bad_snapshot_ts = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
2986+
let req = UpdateTableMetaReq {
2987+
table_id,
2988+
seq: MatchSeq::Exact(table.ident.seq),
2989+
new_table_meta: new_table_meta.clone(),
2990+
base_snapshot_location: None,
2991+
lvt_check: Some(SnapshotLvtCheck {
2992+
tenant: tenant.clone(),
2993+
timestamp: bad_snapshot_ts,
2994+
}),
2995+
};
2996+
let err = mt
2997+
.update_multi_table_meta(UpdateMultiTableMetaReq {
2998+
update_table_metas: vec![(req, table.as_ref().clone())],
2999+
..Default::default()
3000+
})
3001+
.await
3002+
.unwrap_err();
3003+
assert_eq!(
3004+
ErrorCode::TABLE_SNAPSHOT_EXPIRED,
3005+
ErrorCode::from(err).code()
3006+
);
3007+
3008+
// Snapshot newer than LVT should succeed.
3009+
let table = util.get_table().await.unwrap();
3010+
let mut ok_table_meta = table.meta.clone();
3011+
ok_table_meta.comment = "lvt guard success".to_string();
3012+
let ok_snapshot_ts = DateTime::<Utc>::from_timestamp(2_001, 0).unwrap();
3013+
let req = UpdateTableMetaReq {
3014+
table_id,
3015+
seq: MatchSeq::Exact(table.ident.seq),
3016+
new_table_meta: ok_table_meta.clone(),
3017+
base_snapshot_location: None,
3018+
lvt_check: Some(SnapshotLvtCheck {
3019+
tenant,
3020+
timestamp: ok_snapshot_ts,
3021+
}),
3022+
};
3023+
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
3024+
update_table_metas: vec![(req, table.as_ref().clone())],
3025+
..Default::default()
3026+
})
3027+
.await?
3028+
.unwrap();
3029+
3030+
let updated = util.get_table().await.unwrap();
3031+
assert_eq!(updated.meta.comment, "lvt guard success");
3032+
}
29643033
}
29653034
Ok(())
29663035
}
@@ -4302,6 +4371,7 @@ impl SchemaApiTestSuite {
43024371
seq: MatchSeq::Any,
43034372
new_table_meta: table_meta.clone(),
43044373
base_snapshot_location: None,
4374+
lvt_check: None,
43054375
};
43064376

43074377
let table = mt
@@ -4442,6 +4512,7 @@ impl SchemaApiTestSuite {
44424512
seq: MatchSeq::Any,
44434513
new_table_meta: create_table_meta.clone(),
44444514
base_snapshot_location: None,
4515+
lvt_check: None,
44454516
};
44464517

44474518
let table = mt
@@ -6239,6 +6310,7 @@ impl SchemaApiTestSuite {
62396310
seq: MatchSeq::Any,
62406311
new_table_meta: table_meta(created_on),
62416312
base_snapshot_location: None,
6313+
lvt_check: None,
62426314
};
62436315

62446316
let table = mt
@@ -6290,6 +6362,7 @@ impl SchemaApiTestSuite {
62906362
seq: MatchSeq::Any,
62916363
new_table_meta: table_meta(created_on),
62926364
base_snapshot_location: None,
6365+
lvt_check: None,
62936366
};
62946367

62956368
let table = mt
@@ -7803,6 +7876,7 @@ impl SchemaApiTestSuite {
78037876
seq: MatchSeq::Any,
78047877
new_table_meta: table_meta(created_on),
78057878
base_snapshot_location: None,
7879+
lvt_check: None,
78067880
};
78077881

78087882
let table = mt
@@ -7862,6 +7936,7 @@ impl SchemaApiTestSuite {
78627936
seq: MatchSeq::Any,
78637937
new_table_meta: table_meta(created_on),
78647938
base_snapshot_location: None,
7939+
lvt_check: None,
78657940
};
78667941

78677942
let table = mt
@@ -7918,6 +7993,7 @@ impl SchemaApiTestSuite {
79187993
seq: MatchSeq::Any,
79197994
new_table_meta: table_meta(created_on),
79207995
base_snapshot_location: None,
7996+
lvt_check: None,
79217997
};
79227998

79237999
let table = mt
@@ -8372,6 +8448,7 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83728448
seq: MatchSeq::Any,
83738449
new_table_meta: self.table_meta(),
83748450
base_snapshot_location: None,
8451+
lvt_check: None,
83758452
};
83768453

83778454
let req = UpdateMultiTableMetaReq {

src/meta/api/src/table_api.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
3232
use databend_common_meta_app::app_error::StreamAlreadyExists;
3333
use databend_common_meta_app::app_error::StreamVersionMismatched;
3434
use databend_common_meta_app::app_error::TableAlreadyExists;
35+
use databend_common_meta_app::app_error::TableSnapshotExpired;
3536
use databend_common_meta_app::app_error::TableVersionMismatched;
3637
use databend_common_meta_app::app_error::UndropTableHasNoHistory;
3738
use databend_common_meta_app::app_error::UndropTableWithNoDropTime;
@@ -94,11 +95,13 @@ use databend_common_meta_kvapi::kvapi;
9495
use databend_common_meta_kvapi::kvapi::DirName;
9596
use databend_common_meta_kvapi::kvapi::Key;
9697
use databend_common_meta_kvapi::kvapi::KvApiExt;
98+
use databend_common_meta_types::ConditionResult;
9799
use databend_common_meta_types::ConditionResult::Eq;
98100
use databend_common_meta_types::MatchSeqExt;
99101
use databend_common_meta_types::MetaError;
100102
use databend_common_meta_types::MetaId;
101103
use databend_common_meta_types::SeqV;
104+
use databend_common_meta_types::TxnCondition;
102105
use databend_common_meta_types::TxnGetRequest;
103106
use databend_common_meta_types::TxnGetResponse;
104107
use databend_common_meta_types::TxnOp;
@@ -1201,19 +1204,19 @@ where
12011204
})
12021205
.collect::<Vec<_>>();
12031206
let mut tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(self, &tid_vec).await?;
1204-
for (req, (tb_meta_seq, table_meta)) in
1207+
for ((req, _), (tb_meta_seq, table_meta)) in
12051208
update_table_metas.iter().zip(tb_meta_vec.iter_mut())
12061209
{
1207-
let req_seq = req.0.seq;
1210+
let req_seq = req.seq;
12081211

12091212
if *tb_meta_seq == 0 || table_meta.is_none() {
12101213
return Err(KVAppError::AppError(AppError::UnknownTableId(
1211-
UnknownTableId::new(req.0.table_id, "update_multi_table_meta"),
1214+
UnknownTableId::new(req.table_id, "update_multi_table_meta"),
12121215
)));
12131216
}
12141217
if req_seq.match_seq(tb_meta_seq).is_err() {
12151218
mismatched_tbs.push((
1216-
req.0.table_id,
1219+
req.table_id,
12171220
*tb_meta_seq,
12181221
std::mem::take(table_meta).unwrap(),
12191222
));
@@ -1225,27 +1228,39 @@ where
12251228
}
12261229

12271230
let mut new_table_meta_map: BTreeMap<u64, TableMeta> = BTreeMap::new();
1228-
for (req, (tb_meta_seq, table_meta)) in
1231+
for ((req, _), (tb_meta_seq, table_meta)) in
12291232
update_table_metas.iter_mut().zip(tb_meta_vec.iter())
12301233
{
12311234
let tbid = TableId {
1232-
table_id: req.0.table_id,
1235+
table_id: req.table_id,
12331236
};
12341237
// `update_table_meta` MUST NOT modify `shared_by` field
12351238
let table_meta = table_meta.as_ref().unwrap();
12361239

1237-
let mut new_table_meta = req.0.new_table_meta.clone();
1240+
let mut new_table_meta = req.new_table_meta.clone();
12381241
new_table_meta.shared_by = table_meta.shared_by.clone();
12391242

1240-
tbl_seqs.insert(req.0.table_id, *tb_meta_seq);
1243+
tbl_seqs.insert(req.table_id, *tb_meta_seq);
12411244
txn.condition.push(txn_cond_seq(&tbid, Eq, *tb_meta_seq));
1245+
1246+
// Add LVT check if provided
1247+
if let Some(check) = req.lvt_check.as_ref() {
1248+
let lvt_ident = LeastVisibleTimeIdent::new(&check.tenant, req.table_id);
1249+
let check_lvt_value = LeastVisibleTime::new(check.timestamp);
1250+
txn.condition.push(TxnCondition::match_value(
1251+
lvt_ident.to_string_key(),
1252+
ConditionResult::Le,
1253+
serialize_struct(&check_lvt_value)?,
1254+
));
1255+
}
1256+
12421257
txn.if_then
12431258
.push(txn_op_put(&tbid, serialize_struct(&new_table_meta)?));
12441259
txn.else_then.push(TxnOp {
12451260
request: Some(Request::Get(TxnGetRequest::new(tbid.to_string_key()))),
12461261
});
12471262

1248-
new_table_meta_map.insert(req.0.table_id, new_table_meta);
1263+
new_table_meta_map.insert(req.table_id, new_table_meta);
12491264
}
12501265

12511266
// `remove_table_copied_files` and `upsert_table_copied_file_info`
@@ -1385,6 +1400,27 @@ where
13851400
),
13861401
)))
13871402
} else {
1403+
// Check if the transaction failed due to LVT check
1404+
if update_table_metas.len() == 1 {
1405+
let req = &update_table_metas[0].0;
1406+
if let Some(check) = req.lvt_check.as_ref() {
1407+
let lvt_ident = LeastVisibleTimeIdent::new(&check.tenant, req.table_id);
1408+
let current_lvt: Option<SeqV<LeastVisibleTime>> =
1409+
self.get_pb(&lvt_ident).await?;
1410+
if let Some(lvt_data) = current_lvt {
1411+
let lvt_time = lvt_data.data.time;
1412+
let snapshot_ts = check.timestamp;
1413+
1414+
// If LVT > snapshot_ts, the snapshot has expired
1415+
if lvt_time > snapshot_ts {
1416+
return Err(KVAppError::AppError(AppError::TableSnapshotExpired(
1417+
TableSnapshotExpired::new(req.table_id, snapshot_ts, lvt_time),
1418+
)));
1419+
}
1420+
}
1421+
}
1422+
}
1423+
13881424
// if all table version does match, but tx failed, we don't know why, just return error
13891425
Err(KVAppError::AppError(AppError::from(
13901426
MultiStmtTxnCommitFailed::new("update_multi_table_meta"),

src/meta/app/src/app_error.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,26 @@ impl TableLockExpired {
867867
}
868868
}
869869

870+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
871+
#[error(
872+
"Snapshot timestamp {snapshot_ts} for table {table_id} is older than the table's least visible time {lvt}"
873+
)]
874+
pub struct TableSnapshotExpired {
875+
table_id: u64,
876+
snapshot_ts: DateTime<Utc>,
877+
lvt: DateTime<Utc>,
878+
}
879+
880+
impl TableSnapshotExpired {
881+
pub fn new(table_id: u64, snapshot_ts: DateTime<Utc>, lvt: DateTime<Utc>) -> Self {
882+
Self {
883+
table_id,
884+
snapshot_ts,
885+
lvt,
886+
}
887+
}
888+
}
889+
870890
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
871891
#[error(
872892
"CannotShareDatabaseCreatedFromShare: cannot share database {database_name} which created from share while {context}"
@@ -1129,6 +1149,9 @@ pub enum AppError {
11291149
#[error(transparent)]
11301150
TableLockExpired(#[from] TableLockExpired),
11311151

1152+
#[error(transparent)]
1153+
TableSnapshotExpired(#[from] TableSnapshotExpired),
1154+
11321155
#[error(transparent)]
11331156
CannotShareDatabaseCreatedFromShare(#[from] CannotShareDatabaseCreatedFromShare),
11341157

@@ -1476,6 +1499,15 @@ impl AppErrorMessage for TableLockExpired {
14761499
}
14771500
}
14781501

1502+
impl AppErrorMessage for TableSnapshotExpired {
1503+
fn message(&self) -> String {
1504+
format!(
1505+
"Snapshot timestamp {} for table {} is older than the table's least visible time {}",
1506+
self.snapshot_ts, self.table_id, self.lvt,
1507+
)
1508+
}
1509+
}
1510+
14791511
impl AppErrorMessage for CannotShareDatabaseCreatedFromShare {
14801512
fn message(&self) -> String {
14811513
format!(
@@ -1692,6 +1724,7 @@ impl From<AppError> for ErrorCode {
16921724
ErrorCode::UnknownShareEndpointId(err.message())
16931725
}
16941726
AppError::TableLockExpired(err) => ErrorCode::TableLockExpired(err.message()),
1727+
AppError::TableSnapshotExpired(err) => ErrorCode::TableSnapshotExpired(err.message()),
16951728
AppError::CannotShareDatabaseCreatedFromShare(err) => {
16961729
ErrorCode::CannotShareDatabaseCreatedFromShare(err.message())
16971730
}

0 commit comments

Comments
 (0)