Skip to content

Commit

Permalink
Add dsa check to aqo_qtext_store
Browse files Browse the repository at this point in the history
Add validation function for dsm_size_max variable

Check if the data in files containing dsa segments can fit in dsm_size_max
Remove entry from aqo_queries if dsa is full
  • Loading branch information
Alexandra Pervushina committed Jun 28, 2023
1 parent b0c63cc commit 7a53c22
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 15 deletions.
18 changes: 15 additions & 3 deletions aqo.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ aqo_free_callback(ResourceReleasePhase phase,
}
}


/* Validation function for dsm_size_max GUC*/
static bool check_dsm_size_hook(int *newval, void **extra, GucSource source)
{
if (*newval < 0)
{
GUC_check_errdetail("dsm_size_max can't be smaller than zero");
return false;
}
return true;
}

void
_PG_init(void)
{
Expand Down Expand Up @@ -275,9 +287,9 @@ _PG_init(void)
&dsm_size_max,
100,
0, INT_MAX,
PGC_SUSET,
PGC_POSTMASTER,
0,
NULL,
check_dsm_size_hook,
NULL,
NULL
);
Expand Down Expand Up @@ -388,5 +400,5 @@ PG_FUNCTION_INFO_V1(invalidate_deactivated_queries_cache);
Datum
invalidate_deactivated_queries_cache(PG_FUNCTION_ARGS)
{
PG_RETURN_POINTER(NULL);
PG_RETURN_POINTER(NULL);
}
2 changes: 2 additions & 0 deletions aqo_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ aqo_init_shmem(void)
/* Doesn't use DSA, so can be loaded in postmaster */
aqo_stat_load();
aqo_queries_load();

check_dsa_file_size();
}
}

Expand Down
15 changes: 12 additions & 3 deletions preprocessing.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,23 @@ aqo_planner(Query *parse, const char *query_string, int cursorOptions,
query_context.learn_aqo, query_context.use_aqo,
query_context.auto_tuning, &aqo_queries_nulls))
{
bool dsa_valid = true;
/*
* Add query text into the ML-knowledge base. Just for further
* analysis. In the case of cached plans we may have NULL query text.
*/
if (!aqo_qtext_store(query_context.query_hash, query_string))
if (!aqo_qtext_store(query_context.query_hash, query_string, &dsa_valid))
{
Assert(0); /* panic only on debug installation */
elog(ERROR, "[AQO] Impossible situation was detected. Maybe not enough of shared memory?");
if (!dsa_valid)
{
disable_aqo_for_query();
elog(WARNING, "[AQO] Not enough DSA. AQO was disabled for this query");
}
else
{
Assert(0); /* panic only on debug installation */
elog(ERROR, "[AQO] Impossible situation was detected. Maybe not enough of shared memory?");
}
}
}
else
Expand Down
64 changes: 56 additions & 8 deletions storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ _form_qtext_record_cb(void *ctx, size_t *size)
{
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
QueryTextEntry *entry;
void *data;
void *data;
char *query_string;
char *ptr;

Expand Down Expand Up @@ -784,7 +784,7 @@ _deform_qtexts_record_cb(void *data, size_t size)
HASH_ENTER, &found);
Assert(!found);

entry->qtext_dp = dsa_allocate(qtext_dsa, len);
entry->qtext_dp = dsa_allocate_extended(qtext_dsa, len, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);
if (!_check_dsa_validity(entry->qtext_dp))
{
/*
Expand Down Expand Up @@ -829,7 +829,7 @@ aqo_qtexts_load(void)

if (!found)
{
if (!aqo_qtext_store(0, "COMMON feature space (do not delete!)"))
if (!aqo_qtext_store(0, "COMMON feature space (do not delete!)", NULL))
elog(PANIC, "[AQO] DSA Initialization was unsuccessful");
}
}
Expand Down Expand Up @@ -944,6 +944,48 @@ aqo_queries_load(void)
}
}

static long
aqo_get_file_size(const char *filename)
{
FILE *file;
long size = 0;

file = AllocateFile(filename, PG_BINARY_R);
if (file == NULL)
{
if (errno != ENOENT)
goto read_error;
return size;
}

fseek(file, 0L, SEEK_END);
size = ftell(file);

FreeFile(file);
return size;

read_error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", filename)));
if (file)
FreeFile(file);
unlink(filename);
return -1;
}

void
check_dsa_file_size(void)
{
long qtext_size = aqo_get_file_size(PGAQO_TEXT_FILE);
long data_size = aqo_get_file_size(PGAQO_DATA_FILE);

if (qtext_size == -1 || data_size == -1 || qtext_size + data_size >= dsm_size_max * 1024 * 1024)
{
elog(ERROR, "aqo.dsm_size_max is too small");
}
}

static void
data_load(const char *filename, deform_record_t callback, void *ctx)
{
Expand Down Expand Up @@ -1090,13 +1132,16 @@ dsa_init()
* XXX: Maybe merge with aqo_queries ?
*/
bool
aqo_qtext_store(uint64 queryid, const char *query_string)
aqo_qtext_store(uint64 queryid, const char *query_string, bool *dsa_valid)
{
QueryTextEntry *entry;
bool found;
bool tblOverflow;
HASHACTION action;

if (dsa_valid)
*dsa_valid = true;

Assert(!LWLockHeldByMe(&aqo_state->qtexts_lock));

if (query_string == NULL || querytext_max_size == 0)
Expand Down Expand Up @@ -1135,7 +1180,7 @@ aqo_qtext_store(uint64 queryid, const char *query_string)

entry->queryid = queryid;
size = size > querytext_max_size ? querytext_max_size : size;
entry->qtext_dp = dsa_allocate0(qtext_dsa, size);
entry->qtext_dp = dsa_allocate_extended(qtext_dsa, size, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);

if (!_check_dsa_validity(entry->qtext_dp))
{
Expand All @@ -1144,7 +1189,10 @@ aqo_qtext_store(uint64 queryid, const char *query_string)
* that caller recognize it and don't try to call us more.
*/
(void) hash_search(qtexts_htab, &queryid, HASH_REMOVE, NULL);
_aqo_queries_remove(queryid);
LWLockRelease(&aqo_state->qtexts_lock);
if (dsa_valid)
*dsa_valid = false;
return false;
}

Expand Down Expand Up @@ -1423,7 +1471,7 @@ aqo_data_store(uint64 fs, int fss, AqoDataArgs *data, List *reloids)
entry->nrels = nrels;

size = _compute_data_dsa(entry);
entry->data_dp = dsa_allocate0(data_dsa, size);
entry->data_dp = dsa_allocate_extended(data_dsa, size, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);

if (!_check_dsa_validity(entry->data_dp))
{
Expand Down Expand Up @@ -1455,7 +1503,7 @@ aqo_data_store(uint64 fs, int fss, AqoDataArgs *data, List *reloids)

/* Need to re-allocate DSA chunk */
dsa_free(data_dsa, entry->data_dp);
entry->data_dp = dsa_allocate0(data_dsa, size);
entry->data_dp = dsa_allocate_extended(data_dsa, size, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);

if (!_check_dsa_validity(entry->data_dp))
{
Expand Down Expand Up @@ -2713,7 +2761,7 @@ aqo_query_texts_update(PG_FUNCTION_ARGS)

str_buff = (char*) palloc(str_len);
text_to_cstring_buffer(str, str_buff, str_len);
res = aqo_qtext_store(queryid, str_buff);
res = aqo_qtext_store(queryid, str_buff, NULL);
pfree(str_buff);

PG_RETURN_BOOL(res);
Expand Down
3 changes: 2 additions & 1 deletion storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ extern StatEntry *aqo_stat_store(uint64 queryid, bool use_aqo,
extern void aqo_stat_flush(void);
extern void aqo_stat_load(void);

extern bool aqo_qtext_store(uint64 queryid, const char *query_string);
extern bool aqo_qtext_store(uint64 queryid, const char *query_string, bool *dsa_valid);
extern void aqo_qtexts_flush(void);
extern void aqo_qtexts_load(void);

Expand All @@ -156,6 +156,7 @@ extern bool aqo_queries_store(uint64 queryid, uint64 fs, bool learn_aqo,
extern void aqo_queries_flush(void);
extern void aqo_queries_load(void);

extern void check_dsa_file_size(void);
/*
* Machinery for deactivated queries cache.
* TODO: Should live in a custom memory context
Expand Down
76 changes: 76 additions & 0 deletions t/004_dsm_size_max.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use strict;
use warnings;

use Config;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;

use Test::More tests => 5;

my $node = PostgreSQL::Test::Cluster->new('aqotest');
$node->init;
$node->append_conf('postgresql.conf', qq{
shared_preload_libraries = 'aqo'
aqo.mode = 'learn'
log_statement = 'ddl'
aqo.join_threshold = 0
aqo.dsm_size_max = 4
aqo.fs_max_items = 30000
aqo.querytext_max_size = 1000000
});

# Disable connection default settings, forced by PGOPTIONS in AQO Makefile
$ENV{PGOPTIONS}="";

# General purpose variables.
my $long_string = 'a' x 1000000;

$node->start();
$node->psql('postgres', 'CREATE EXTENSION aqo;');

for my $i (1 .. 3) {
$node->psql('postgres', "select aqo_query_texts_update(" . $i . ", \'" . $long_string . "\');");
}
$node->stop();

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '1');
is($node->start(fail_ok => 1),
0, "node fails to start");

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '4');
is($node->start(),
1, "node starts");
$node->psql('postgres', 'select * from aqo_reset();');

$long_string = '1, ' x 10000;
for my $i (1 .. 30) {
$node->psql('postgres', "select aqo_data_update(" . $i . ", 1, 1, '{{1}}', '{1}', '{1}', '{" . $long_string . " 1}');");
}
$node->stop();

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '1');
is($node->start(fail_ok => 1),
0, "node fails to start");

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '4');
is($node->start(),
1, "node starts");
$node->psql('postgres', 'select * from aqo_reset();');
$node->stop();

my $regex;
$long_string = 'a' x 100000;
$regex = qr/.*WARNING: \[AQO\] Not enough DSA\. AQO was disabled for this query/;
$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '1');
$node->start();
my ($stdout, $stderr);
for my $i (1 .. 20) {
$node->psql('postgres', "create table a as select s, md5(random()::text) from generate_Series(1,100) s;");
$node->psql('postgres',
"SELECT a.s FROM a CROSS JOIN ( SELECT '" . $long_string . "' as long_string) AS extra_rows;",
stdout => \$stdout, stderr => \$stderr);
$node->psql('postgres', "drop table a");
}
like($stderr, $regex, 'warning for exceeding the dsa limit');
$node->stop;
done_testing();

0 comments on commit 7a53c22

Please sign in to comment.