Skip to content

Commit

Permalink
Bug#32169848 THD_NDB TRANSACTION FUNCTIONALITY [#15]
Browse files Browse the repository at this point in the history
Move functionality for retrieveing table statistics into its own file.
Add function comments and make function less dependent on other parts of
the ndbcluster plugin.
- remove unnecessary NdbRecord argument, instead use tables
  default NdbRecord which is enough since there are no real columns
  in the scan.
- return NdbError to caller for further error handling

This also fixes a problem where the error code of each retried
temporary error was pushed as warning, resulting in 100 warnings
for the failing queries in ndb_transaction_memory_shortage_dbacc.test

Change-Id: Icf0a53378101cc661506a20a8ad27501e66e4305
  • Loading branch information
blaudden committed May 19, 2021
1 parent 0794d39 commit aa81047
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 1,239 deletions.
1,108 changes: 54 additions & 1,054 deletions mysql-test/suite/ndb/r/ndb_transaction_memory_shortage_dbacc.result

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions storage/ndb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ SET(NDBCLUSTER_SOURCES
plugin/ndb_upgrade_util.cc
plugin/ndb_mysql_services.cc
plugin/ndb_blobs_buffer.cc
plugin/ndb_stats.cc
)

NDB_ADD_TEST("ndb_bitmap-t" "plugin/ndb_bitmap.cc" LIBS ndbgeneral)
Expand Down
190 changes: 13 additions & 177 deletions storage/ndb/plugin/ha_ndbcluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
#include "storage/ndb/plugin/ndb_schema_trans_guard.h"
#include "storage/ndb/plugin/ndb_server_hooks.h"
#include "storage/ndb/plugin/ndb_sleep.h"
#include "storage/ndb/plugin/ndb_stats.h"
#include "storage/ndb/plugin/ndb_table_guard.h"
#include "storage/ndb/plugin/ndb_tdc.h"
#include "storage/ndb/plugin/ndb_thd.h"
Expand Down Expand Up @@ -421,11 +422,6 @@ mysql_cond_t ndbcluster_cond;
static const char *ndbcluster_hton_name = "ndbcluster";
static const int ndbcluster_hton_name_length = sizeof(ndbcluster_hton_name) - 1;

static int ndb_get_table_statistics(THD *thd, ha_ndbcluster *, Ndb *,
const NdbDictionary::Table *,
const NdbRecord *, NDB_SHARE::Table_stats *,
uint part_id = ~(uint)0);

static ulong multi_range_fixed_size(int num_ranges);

static ulong multi_range_max_entry(NDB_INDEX_TYPE keytype, ulong reclength);
Expand Down Expand Up @@ -12878,18 +12874,24 @@ int ha_ndbcluster::update_stats(THD *thd, bool do_read_stat, uint part_id) {
Thd_ndb *thd_ndb = get_thd_ndb(thd);
DBUG_TRACE;

NDB_SHARE::Table_stats table_stats;
Ndb_table_stats table_stats;
if (!do_read_stat) {
// Just use the cached stats from NDB_SHARE without reading from NDB
mysql_mutex_lock(&m_share->mutex);
table_stats = m_share->cached_table_stats;
mysql_mutex_unlock(&m_share->mutex);
} else {
// Request stats from NDB
if (int err =
ndb_get_table_statistics(thd, this, thd_ndb->ndb, m_table,
m_ndb_record, &table_stats, part_id)) {
return err;
NdbError ndb_error;
if (ndb_get_table_statistics(thd, thd_ndb->ndb, m_table, &table_stats,
ndb_error, part_id)) {
if (ndb_error.classification == NdbError::SchemaError) {
// Updating stats for table failed due to a schema error. Mark the NDB
// table def as invalid, this will cause also all index defs to be
// invalidate on close
m_table->setStatusInvalid();
}
return ndb_to_mysql_error(&ndb_error);
}

// Update cached stats in NDB_SHARE with fresh data
Expand All @@ -12905,7 +12907,7 @@ int ha_ndbcluster::update_stats(THD *thd, bool do_read_stat, uint part_id) {
no_uncommitted_rows_count = m_table_info->no_uncommitted_rows_count;
}
// Update values in handler::stats (another "records")
stats.mean_rec_length = table_stats.row_size;
stats.mean_rec_length = static_cast<ulong>(table_stats.row_size);
stats.data_file_length = table_stats.fragment_memory;
stats.records = table_stats.row_count + no_uncommitted_rows_count;
stats.max_data_file_length = table_stats.fragment_extent_space;
Expand All @@ -12923,172 +12925,6 @@ int ha_ndbcluster::update_stats(THD *thd, bool do_read_stat, uint part_id) {
return 0;
}

/* If part_id contains a legal partition id, ndbstat returns the
partition-statistics pertaining to that partition only.
Otherwise, it returns the table-statistics,
which is an aggregate over all partitions of that table.
*/
static int ndb_get_table_statistics(THD *thd, ha_ndbcluster *file, Ndb *ndb,
const NdbDictionary::Table *tab,
const NdbRecord *record,
NDB_SHARE::Table_stats *stats,
uint part_id) {
Thd_ndb *thd_ndb = get_thd_ndb(current_thd);
NdbTransaction *pTrans;
NdbError error;
int retries = 100;
int reterr = 0;
const char *dummyRowPtr;
NdbOperation::GetValueSpec extraGets[7];
Uint64 rows, fixed_mem, var_mem, ext_space, free_ext_space;
Uint32 size, fragid;

DBUG_TRACE;

assert(record != 0);

/* We use the passed in NdbRecord just to get access to the
table, we mask out any/all columns it may have and add
our reads as extraGets. This is necessary as they are
all pseudo-columns
*/
extraGets[0].column = NdbDictionary::Column::ROW_COUNT;
extraGets[0].appStorage = &rows;
extraGets[1].column = NdbDictionary::Column::ROW_SIZE;
extraGets[1].appStorage = &size;
extraGets[2].column = NdbDictionary::Column::FRAGMENT_FIXED_MEMORY;
extraGets[2].appStorage = &fixed_mem;
extraGets[3].column = NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY;
extraGets[3].appStorage = &var_mem;
extraGets[4].column = NdbDictionary::Column::FRAGMENT_EXTENT_SPACE;
extraGets[4].appStorage = &ext_space;
extraGets[5].column = NdbDictionary::Column::FRAGMENT_FREE_EXTENT_SPACE;
extraGets[5].appStorage = &free_ext_space;
extraGets[6].column = NdbDictionary::Column::FRAGMENT;
extraGets[6].appStorage = &fragid;

const Uint32 codeWords = 1;
Uint32 codeSpace[codeWords];
NdbInterpretedCode code(NULL, // Table is irrelevant
&codeSpace[0], codeWords);
if ((code.interpret_exit_last_row() != 0) || (code.finalise() != 0)) {
reterr = code.getNdbError().code;
DBUG_PRINT("exit", ("failed, reterr: %u, NdbError %u(%s)", reterr,
error.code, error.message));
return reterr;
}

do {
Uint32 count = 0;
Uint64 sum_rows = 0;
Uint64 sum_row_size = 0;
Uint64 sum_mem = 0;
Uint64 sum_ext_space = 0;
Uint64 sum_free_ext_space = 0;
NdbScanOperation *pOp;
int check;

if ((pTrans = ndb->startTransaction(tab)) == NULL) {
error = ndb->getNdbError();
goto retry;
}

NdbScanOperation::ScanOptions options;
options.optionsPresent = NdbScanOperation::ScanOptions::SO_BATCH |
NdbScanOperation::ScanOptions::SO_GETVALUE |
NdbScanOperation::ScanOptions::SO_INTERPRETED;
/* Set batch_size=1, as we need only one row per fragment. */
options.batch = 1;
options.extraGetValues = &extraGets[0];
options.numExtraGetValues = sizeof(extraGets) / sizeof(extraGets[0]);
options.interpretedCode = &code;

if ((pOp = pTrans->scanTable(
record, NdbOperation::LM_CommittedRead, empty_mask, &options,
sizeof(NdbScanOperation::ScanOptions))) == NULL) {
error = pTrans->getNdbError();
goto retry;
}
thd_ndb->m_scan_count++;
thd_ndb->m_pruned_scan_count += (pOp->getPruned() ? 1 : 0);

thd_ndb->m_execute_count++;
DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
if (pTrans->execute(NdbTransaction::NoCommit, NdbOperation::AbortOnError,
true) == -1) {
error = pTrans->getNdbError();
goto retry;
}

while ((check = pOp->nextResult(&dummyRowPtr, true, true)) == 0) {
DBUG_PRINT("info",
("nextResult rows: %llu, "
"fixed_mem_size %llu var_mem_size %llu "
"fragmentid %u extent_space %llu free_extent_space %llu",
rows, fixed_mem, var_mem, fragid, ext_space, free_ext_space));

if ((part_id != ~(uint)0) && fragid != part_id) {
continue;
}

sum_rows += rows;
if (sum_row_size < size) sum_row_size = size;
sum_mem += fixed_mem + var_mem;
count++;
sum_ext_space += ext_space;
sum_free_ext_space += free_ext_space;

if ((part_id != ~(uint)0) && fragid == part_id) {
break;
}
}

if (check == -1) {
error = pOp->getNdbError();
goto retry;
}

pOp->close(true);

ndb->closeTransaction(pTrans);

stats->row_count = sum_rows;
stats->row_size = (ulong)sum_row_size;
stats->fragment_memory = sum_mem;
stats->fragment_extent_space = sum_ext_space;
stats->fragment_extent_free_space = sum_free_ext_space;

DBUG_PRINT("exit", ("records: %llu row_size: %llu "
"mem: %llu allocated: %llu free: %llu count: %u",
sum_rows, sum_row_size, sum_mem, sum_ext_space,
sum_free_ext_space, count));

return 0;
retry:
if (file && pTrans) {
reterr = file->ndb_err(pTrans);
} else {
const NdbError &tmp = error;
ERR_PRINT(tmp);
reterr = ndb_to_mysql_error(&tmp);
}

if (pTrans) {
ndb->closeTransaction(pTrans);
pTrans = NULL;
}
if (error.status == NdbError::TemporaryError && retries-- &&
!thd_killed(thd)) {
ndb_trans_retry_sleep();
continue;
}
break;
} while (1);
DBUG_PRINT("exit", ("failed, reterr: %u, NdbError %u(%s)", reterr, error.code,
error.message));
return reterr;
}

void ha_ndbcluster::check_read_before_write_removal() {
DBUG_TRACE;

Expand Down
11 changes: 3 additions & 8 deletions storage/ndb/plugin/ndb_share.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

#include "my_alloc.h" // MEM_ROOT
#include "storage/ndb/include/ndbapi/Ndb.hpp" // Ndb::TupleIdRange
#include "thr_lock.h" // THR_LOCK
#include "storage/ndb/plugin/ndb_stats.h"
#include "thr_lock.h" // THR_LOCK

class ha_ndbcluster;
class NdbEventOperation;
Expand Down Expand Up @@ -96,13 +97,7 @@ struct NDB_SHARE {
// This is cached values and used in queries when that is "good enough",
// in other cases the values are updated from NDB before use.
// NOTE! Protected by NDB_SHARE::mutex
struct Table_stats {
Uint64 row_count;
ulong row_size;
Uint64 fragment_memory;
Uint64 fragment_extent_space;
Uint64 fragment_extent_free_space;
} cached_table_stats;
Ndb_table_stats cached_table_stats;
// Update cached row count with number of changed rows
void update_cached_row_count(int changed_rows);

Expand Down
Loading

0 comments on commit aa81047

Please sign in to comment.