-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcatalog_mgr.cc
110 lines (91 loc) · 3.27 KB
/
catalog_mgr.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include "catalog_mgr.h"
namespace ermia {
namespace catalog {
void create_schema_table(ermia::Engine *db, const char *name) {
ermia::thread::Thread *thread = ermia::thread::GetThread(true);
ALWAYS_ASSERT(thread);
auto create_table = [=](char *) {
db->CreateTable(name);
db->CreateMasstreePrimaryIndex(name, std::string(name));
ermia::schema_td = ermia::Catalog::GetTable(name);
};
thread->StartTask(create_table);
thread->Join();
ermia::thread::PutThread(thread);
}
void read_schema(transaction *t, ConcurrentMasstreeIndex *schema_table_index,
ConcurrentMasstreeIndex *target_table_index,
const varstr &table_name, varstr &out_schema_value, OID *out_schema_oid) {
auto *target_td = target_table_index->GetMasstree().get_table_descriptor();
bool present_in_table_set = t->get_table_set()->find(target_td->GetTupleFid());
#ifdef BLOCKDDL
if (!present_in_table_set) {
target_td->LockSchema(t->is_ddl());
// Refresh begin timestamp after lock is granted
t->GetXIDContext()->begin = dlog::current_csn.load(std::memory_order_relaxed);
}
#endif
bool schema_ready = true;
retry:
rc_t rc;
schema_table_index->GetRecord(t, rc, table_name, out_schema_value, out_schema_oid);
#ifdef BLOCKDDL
// Under blocking DDL this will always succeed
LOG_IF(FATAL, rc._val != RC_TRUE);
#else
if (rc._val != RC_TRUE) {
DLOG(INFO) << "Catalog: failed reading schema";
goto retry;
}
#endif
schema_kv::value schema_value_temp;
const schema_kv::value *schema = Decode(out_schema_value, schema_value_temp);
#ifdef COPYDDL
if (unlikely(t->is_ddl() && schema->state != ddl::schema_state_type::COMPLETE)) {
goto retry;
}
if (schema->state == ddl::schema_state_type::NOT_READY) {
#ifndef LAZYDDL
if (schema->ddl_type != ddl::ddl_type::COPY_ONLY || config::enable_cdc_schema_lock) {
goto retry;
}
target_td = target_table_index->GetMasstree().get_table_descriptor();
if (target_td->GetTupleFid() != schema->fid) {
goto retry;
} else {
t->SetWaitForNewSchema(true);
schema_ready = false;
}
#else // LAZYDDL
goto retry;
#endif
}
#endif // COPYDDL
if (!present_in_table_set) {
TableDescriptor *old_td = schema->old_fid ? Catalog::GetTable(schema->old_fid) : nullptr;
t->add_to_table_set(target_td, schema->fid, *out_schema_oid, schema->version, schema_ready, old_td);
}
}
rc_t write_schema(transaction *t, ConcurrentMasstreeIndex *schema_table_index,
const varstr &table_name, varstr &schema_value,
OID *out_schema_oid, ddl::ddl_executor *ddl_exe,
bool is_insert) {
// For DDL txn only
ALWAYS_ASSERT(t->is_ddl());
auto &schema_idx = schema_table_index->GetMasstree();
auto *target_td = schema_idx.get_table_descriptor();
OID oid = INVALID_OID;
rc_t rc = is_insert ? schema_table_index->InsertRecord(t, table_name, schema_value, &oid)
: schema_table_index->UpdateRecord(t, table_name, schema_value);
#ifdef BLOCKDDL
// Under blocking DDL this will always succeed
LOG_IF(FATAL, rc._val != RC_TRUE);
#endif
DLOG_IF(INFO, rc._val != RC_TRUE) << "Catalog: failed updating schema";
if (out_schema_oid) {
*out_schema_oid = oid;
}
return rc;
}
} // namespace catalog
} // namespace ermia