@@ -54,47 +54,44 @@ void ConcurrencyControl::refreshGlobalState() {
54
54
Worker::sNewestOlapStartTx .store (localNewestOlap,
55
55
std::memory_order_release);
56
56
57
- TXID global_all_lwm_buffer = std::numeric_limits<TXID>::max ();
58
- TXID global_oltp_lwm_buffer = std::numeric_limits<TXID>::max ();
59
- bool skipped_a_worker = false ;
57
+ TXID globalAllLwmBuffer = std::numeric_limits<TXID>::max ();
58
+ TXID globalOltpLwmBuffer = std::numeric_limits<TXID>::max ();
59
+ bool skippedAWorker = false ;
60
60
for (WORKERID i = 0 ; i < Worker::my ().mNumAllWorkers ; i++) {
61
61
ConcurrencyControl& workerState = other (i);
62
62
if (workerState.mLatestLwm4Tx == workerState.mLatestWriteTx ) {
63
- skipped_a_worker = true ;
63
+ skippedAWorker = true ;
64
64
continue ;
65
- } else {
66
- workerState.mLatestLwm4Tx .store (workerState.mLatestWriteTx ,
67
- std::memory_order_release);
68
65
}
69
-
70
- TXID its_all_lwm_buffer =
66
+ workerState.mLatestLwm4Tx .store (workerState.mLatestWriteTx ,
67
+ std::memory_order_release);
68
+ TXID itsAllLwmBuffer =
71
69
workerState.commit_tree .LCB (Worker::sOldestAllStartTs );
72
- TXID its_oltp_lwm_buffer =
70
+ TXID itsOltpLwmBuffer =
73
71
workerState.commit_tree .LCB (Worker::sOldestOltpStartTx );
74
72
75
73
if (FLAGS_olap_mode &&
76
74
Worker::sOldestAllStartTs != Worker::sOldestOltpStartTx ) {
77
- global_oltp_lwm_buffer =
78
- std::min<TXID>(its_oltp_lwm_buffer, global_oltp_lwm_buffer );
75
+ globalOltpLwmBuffer =
76
+ std::min<TXID>(itsOltpLwmBuffer, globalOltpLwmBuffer );
79
77
} else {
80
- its_oltp_lwm_buffer = its_all_lwm_buffer ;
78
+ itsOltpLwmBuffer = itsAllLwmBuffer ;
81
79
}
82
80
83
- global_all_lwm_buffer =
84
- std::min<TXID>(its_all_lwm_buffer, global_all_lwm_buffer);
81
+ globalAllLwmBuffer = std::min<TXID>(itsAllLwmBuffer, globalAllLwmBuffer);
85
82
86
83
workerState.local_lwm_latch .store (workerState.local_lwm_latch .load () + 1 ,
87
84
std::memory_order_release); // Latch
88
- workerState.all_lwm_receiver .store (its_all_lwm_buffer ,
85
+ workerState.all_lwm_receiver .store (itsAllLwmBuffer ,
89
86
std::memory_order_release);
90
- workerState.oltp_lwm_receiver .store (its_oltp_lwm_buffer ,
87
+ workerState.oltp_lwm_receiver .store (itsOltpLwmBuffer ,
91
88
std::memory_order_release);
92
89
workerState.local_lwm_latch .store (workerState.local_lwm_latch .load () + 1 ,
93
90
std::memory_order_release); // Release
94
91
}
95
- if (!skipped_a_worker ) {
96
- Worker::sAllLwm .store (global_all_lwm_buffer , std::memory_order_release);
97
- Worker::sOltpLwm .store (global_oltp_lwm_buffer , std::memory_order_release);
92
+ if (!skippedAWorker ) {
93
+ Worker::sAllLwm .store (globalAllLwmBuffer , std::memory_order_release);
94
+ Worker::sOltpLwm .store (globalOltpLwmBuffer , std::memory_order_release);
98
95
}
99
96
100
97
Worker::sGlobalMutex .unlock ();
@@ -133,12 +130,12 @@ void ConcurrencyControl::garbageCollection() {
133
130
// fix, it should be enough if we purge in small batches
134
131
utils::Timer timer (CRCounters::myCounters ().cc_ms_gc );
135
132
synclwm : {
136
- u64 lwm_version = local_lwm_latch.load ();
137
- while ((lwm_version = local_lwm_latch.load ()) & 1 ) {
133
+ u64 lwmVersion = local_lwm_latch.load ();
134
+ while ((lwmVersion = local_lwm_latch.load ()) & 1 ) {
138
135
};
139
136
local_all_lwm = all_lwm_receiver.load ();
140
137
local_oltp_lwm = oltp_lwm_receiver.load ();
141
- if (lwm_version != local_lwm_latch.load ()) {
138
+ if (lwmVersion != local_lwm_latch.load ()) {
142
139
goto synclwm;
143
140
}
144
141
ENSURE (!FLAGS_olap_mode || local_all_lwm <= local_oltp_lwm);
@@ -168,10 +165,10 @@ synclwm : {
168
165
local_oltp_lwm > cleaned_untill_oltp_lwm) {
169
166
utils::Timer timer (CRCounters::myCounters ().cc_ms_gc_graveyard );
170
167
// MOVE deletes to the graveyard
171
- const u64 from_tx_id =
168
+ const u64 fromTxId =
172
169
cleaned_untill_oltp_lwm > 0 ? cleaned_untill_oltp_lwm : 0 ;
173
170
mHistoryTree .visitRemoveVersions (
174
- Worker::my ().mWorkerId , from_tx_id , local_oltp_lwm - 1 ,
171
+ Worker::my ().mWorkerId , fromTxId , local_oltp_lwm - 1 ,
175
172
[&](const TXID tx_id, const TREEID treeId, const u8* version_payload,
176
173
[[maybe_unused]] u64 version_payload_length,
177
174
const bool called_before) {
@@ -189,28 +186,27 @@ synclwm : {
189
186
}
190
187
191
188
ConcurrencyControl::VISIBILITY ConcurrencyControl::isVisibleForIt (
192
- WORKERID whom_worker_id , TXID commitTs) {
193
- return local_workers_start_ts[whom_worker_id ] > commitTs
189
+ WORKERID whomWorkerId , TXID commitTs) {
190
+ return local_workers_start_ts[whomWorkerId ] > commitTs
194
191
? VISIBILITY::VISIBLE_ALREADY
195
192
: VISIBILITY::VISIBLE_NEXT_ROUND;
196
193
}
197
194
198
195
// UNDETERMINED is not possible atm because we spin on startTs
199
196
ConcurrencyControl::VISIBILITY ConcurrencyControl::isVisibleForIt (
200
- WORKERID whom_worker_id, WORKERID what_worker_id, TXID tx_ts) {
201
- const bool is_commit_ts = tx_ts & MSB;
202
- const TXID commitTs = is_commit_ts
203
- ? (tx_ts & MSB_MASK)
204
- : getCommitTimestamp (what_worker_id, tx_ts);
205
- return isVisibleForIt (whom_worker_id, commitTs);
197
+ WORKERID whomWorkerId, WORKERID whatWorkerId, TXID txTs) {
198
+ const bool is_commit_ts = txTs & MSB;
199
+ const TXID commitTs =
200
+ is_commit_ts ? (txTs & MSB_MASK) : getCommitTimestamp (whatWorkerId, txTs);
201
+ return isVisibleForIt (whomWorkerId, commitTs);
206
202
}
207
203
208
- TXID ConcurrencyControl::getCommitTimestamp (WORKERID workerId, TXID tx_ts ) {
209
- if (tx_ts & MSB) {
210
- return tx_ts & MSB_MASK;
204
+ TXID ConcurrencyControl::getCommitTimestamp (WORKERID workerId, TXID txTs ) {
205
+ if (txTs & MSB) {
206
+ return txTs & MSB_MASK;
211
207
}
212
- DCHECK ((tx_ts & MSB) || isVisibleForMe (workerId, tx_ts ));
213
- const TXID& startTs = tx_ts ;
208
+ DCHECK ((txTs & MSB) || isVisibleForMe (workerId, txTs ));
209
+ const TXID& startTs = txTs ;
214
210
TXID lcb = other (workerId).commit_tree .LCB (startTs);
215
211
TXID commitTs =
216
212
lcb ? lcb : std::numeric_limits<TXID>::max (); // TODO: align with GC
@@ -280,12 +276,12 @@ bool ConcurrencyControl::isVisibleForMe(WORKERID workerId, u64 txId,
280
276
return true ;
281
277
}
282
278
utils::Timer timer (CRCounters::myCounters ().cc_ms_snapshotting );
283
- TXID largest_visible_tx_id =
279
+ TXID largestVisibleTxId =
284
280
other (workerId).commit_tree .LCB (Worker::my ().mActiveTx .startTS ());
285
- if (largest_visible_tx_id ) {
286
- mLocalSnapshotCache [workerId] = largest_visible_tx_id ;
281
+ if (largestVisibleTxId ) {
282
+ mLocalSnapshotCache [workerId] = largestVisibleTxId ;
287
283
local_snapshot_cache_ts[workerId] = Worker::my ().mActiveTx .startTS ();
288
- return largest_visible_tx_id >= startTs;
284
+ return largestVisibleTxId >= startTs;
289
285
}
290
286
return false ;
291
287
}
@@ -323,14 +319,15 @@ std::optional<std::pair<TXID, TXID>> ConcurrencyControl::CommitTree::LCBUnsafe(
323
319
std::lower_bound (begin, end, startTs, [&](const auto & pair, TXID ts) {
324
320
return pair.first < ts;
325
321
});
322
+
326
323
if (it == begin) {
327
324
// raise(SIGTRAP);
328
325
return {};
329
- } else {
330
- it--;
331
- assert (it->second < startTs);
332
- return *it;
333
326
}
327
+
328
+ it--;
329
+ assert (it->second < startTs);
330
+ return *it;
334
331
}
335
332
336
333
TXID ConcurrencyControl::CommitTree::LCB (TXID startTs) {
@@ -352,22 +349,22 @@ void ConcurrencyControl::CommitTree::cleanIfNecessary() {
352
349
utils::Timer timer (CRCounters::myCounters ().cc_ms_gc_cm );
353
350
std::set<std::pair<TXID, TXID>> set; // TODO: unordered_set
354
351
355
- const WORKERID my_worker_id = cr::Worker::Worker::my ().mWorkerId ;
352
+ const WORKERID myWorkerId = cr::Worker::Worker::my ().mWorkerId ;
356
353
for (WORKERID i = 0 ; i < cr::Worker::Worker::my ().mNumAllWorkers ; i++) {
357
- if (i == my_worker_id ) {
354
+ if (i == myWorkerId ) {
358
355
continue ;
359
356
}
360
- u64 its_start_ts = Worker::sWorkersCurrentSnapshot [i].load ();
361
- while (its_start_ts & Worker::LATCH_BIT) {
362
- its_start_ts = Worker::sWorkersCurrentSnapshot [i].load ();
357
+ u64 itsStartTs = Worker::sWorkersCurrentSnapshot [i].load ();
358
+ while (itsStartTs & Worker::LATCH_BIT) {
359
+ itsStartTs = Worker::sWorkersCurrentSnapshot [i].load ();
363
360
}
364
- its_start_ts &= Worker::CLEAN_BITS_MASK;
361
+ itsStartTs &= Worker::CLEAN_BITS_MASK;
365
362
set.insert (array[cursor - 1 ]); // for the new TX
366
- if (its_start_ts == 0 ) {
363
+ if (itsStartTs == 0 ) {
367
364
// to avoid race conditions when switching from RC to SI
368
365
set.insert (array[0 ]);
369
366
} else {
370
- auto v = LCBUnsafe (its_start_ts );
367
+ auto v = LCBUnsafe (itsStartTs );
371
368
if (v) {
372
369
set.insert (*v);
373
370
}
0 commit comments