Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stable15 dsm fix #176

Merged
merged 3 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions aqo.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ _PG_init(void)
&dsm_size_max,
100,
0, INT_MAX,
PGC_SUSET,
0,
PGC_POSTMASTER,
GUC_UNIT_MB,
NULL,
NULL,
NULL
Expand Down Expand Up @@ -388,5 +388,5 @@ PG_FUNCTION_INFO_V1(invalidate_deactivated_queries_cache);
Datum
invalidate_deactivated_queries_cache(PG_FUNCTION_ARGS)
{
PG_RETURN_POINTER(NULL);
PG_RETURN_POINTER(NULL);
}
2 changes: 2 additions & 0 deletions aqo_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ aqo_init_shmem(void)
/* Doesn't use DSA, so can be loaded in postmaster */
aqo_stat_load();
aqo_queries_load();

check_dsa_file_size();
}
}

Expand Down
15 changes: 12 additions & 3 deletions preprocessing.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,23 @@ aqo_planner(Query *parse, const char *query_string, int cursorOptions,
query_context.learn_aqo, query_context.use_aqo,
query_context.auto_tuning, &aqo_queries_nulls))
{
bool dsa_valid = true;
/*
* Add query text into the ML-knowledge base. Just for further
* analysis. In the case of cached plans we may have NULL query text.
*/
if (!aqo_qtext_store(query_context.query_hash, query_string))
if (!aqo_qtext_store(query_context.query_hash, query_string, &dsa_valid))
{
Assert(0); /* panic only on debug installation */
elog(ERROR, "[AQO] Impossible situation was detected. Maybe not enough of shared memory?");
if (!dsa_valid)
{
disable_aqo_for_query();
elog(WARNING, "[AQO] Not enough DSA. AQO was disabled for this query");
}
else
{
Assert(0); /* panic only on debug installation */
elog(ERROR, "[AQO] Impossible situation was detected. Maybe not enough of shared memory?");
}
}
}
else
Expand Down
65 changes: 57 additions & 8 deletions storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ _form_qtext_record_cb(void *ctx, size_t *size)
{
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
QueryTextEntry *entry;
void *data;
void *data;
char *query_string;
char *ptr;

Expand Down Expand Up @@ -784,7 +784,7 @@ _deform_qtexts_record_cb(void *data, size_t size)
HASH_ENTER, &found);
Assert(!found);

entry->qtext_dp = dsa_allocate(qtext_dsa, len);
entry->qtext_dp = dsa_allocate_extended(qtext_dsa, len, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);
if (!_check_dsa_validity(entry->qtext_dp))
{
/*
Expand Down Expand Up @@ -829,7 +829,7 @@ aqo_qtexts_load(void)

if (!found)
{
if (!aqo_qtext_store(0, "COMMON feature space (do not delete!)"))
if (!aqo_qtext_store(0, "COMMON feature space (do not delete!)", NULL))
elog(PANIC, "[AQO] DSA Initialization was unsuccessful");
}
}
Expand Down Expand Up @@ -944,6 +944,49 @@ aqo_queries_load(void)
}
}

static long
aqo_get_file_size(const char *filename)
{
FILE *file;
long size = 0;

file = AllocateFile(filename, PG_BINARY_R);
if (file == NULL)
{
if (errno != ENOENT)
goto read_error;
return size;
}

fseek(file, 0L, SEEK_END);
size = ftell(file);

FreeFile(file);
return size;

read_error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", filename)));
if (file)
FreeFile(file);
unlink(filename);
return -1;
}

void
check_dsa_file_size(void)
{
long qtext_size = aqo_get_file_size(PGAQO_TEXT_FILE);
long data_size = aqo_get_file_size(PGAQO_DATA_FILE);

if (qtext_size == -1 || data_size == -1 ||
qtext_size + data_size >= dsm_size_max * 1024 * 1024)
{
elog(ERROR, "aqo.dsm_size_max is too small");
}
}

static void
data_load(const char *filename, deform_record_t callback, void *ctx)
{
Expand Down Expand Up @@ -1090,13 +1133,16 @@ dsa_init()
* XXX: Maybe merge with aqo_queries ?
*/
bool
aqo_qtext_store(uint64 queryid, const char *query_string)
aqo_qtext_store(uint64 queryid, const char *query_string, bool *dsa_valid)
{
QueryTextEntry *entry;
bool found;
bool tblOverflow;
HASHACTION action;

if (dsa_valid)
*dsa_valid = true;

Assert(!LWLockHeldByMe(&aqo_state->qtexts_lock));

if (query_string == NULL || querytext_max_size == 0)
Expand Down Expand Up @@ -1135,7 +1181,7 @@ aqo_qtext_store(uint64 queryid, const char *query_string)

entry->queryid = queryid;
size = size > querytext_max_size ? querytext_max_size : size;
entry->qtext_dp = dsa_allocate0(qtext_dsa, size);
entry->qtext_dp = dsa_allocate_extended(qtext_dsa, size, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);

if (!_check_dsa_validity(entry->qtext_dp))
{
Expand All @@ -1144,7 +1190,10 @@ aqo_qtext_store(uint64 queryid, const char *query_string)
* that caller recognize it and don't try to call us more.
*/
(void) hash_search(qtexts_htab, &queryid, HASH_REMOVE, NULL);
_aqo_queries_remove(queryid);
LWLockRelease(&aqo_state->qtexts_lock);
if (dsa_valid)
*dsa_valid = false;
return false;
}

Expand Down Expand Up @@ -1423,7 +1472,7 @@ aqo_data_store(uint64 fs, int fss, AqoDataArgs *data, List *reloids)
entry->nrels = nrels;

size = _compute_data_dsa(entry);
entry->data_dp = dsa_allocate0(data_dsa, size);
entry->data_dp = dsa_allocate_extended(data_dsa, size, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);

if (!_check_dsa_validity(entry->data_dp))
{
Expand Down Expand Up @@ -1455,7 +1504,7 @@ aqo_data_store(uint64 fs, int fss, AqoDataArgs *data, List *reloids)

/* Need to re-allocate DSA chunk */
dsa_free(data_dsa, entry->data_dp);
entry->data_dp = dsa_allocate0(data_dsa, size);
entry->data_dp = dsa_allocate_extended(data_dsa, size, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO);

if (!_check_dsa_validity(entry->data_dp))
{
Expand Down Expand Up @@ -2713,7 +2762,7 @@ aqo_query_texts_update(PG_FUNCTION_ARGS)

str_buff = (char*) palloc(str_len);
text_to_cstring_buffer(str, str_buff, str_len);
res = aqo_qtext_store(queryid, str_buff);
res = aqo_qtext_store(queryid, str_buff, NULL);
pfree(str_buff);

PG_RETURN_BOOL(res);
Expand Down
3 changes: 2 additions & 1 deletion storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ extern StatEntry *aqo_stat_store(uint64 queryid, bool use_aqo,
extern void aqo_stat_flush(void);
extern void aqo_stat_load(void);

extern bool aqo_qtext_store(uint64 queryid, const char *query_string);
extern bool aqo_qtext_store(uint64 queryid, const char *query_string, bool *dsa_valid);
extern void aqo_qtexts_flush(void);
extern void aqo_qtexts_load(void);

Expand All @@ -156,6 +156,7 @@ extern bool aqo_queries_store(uint64 queryid, uint64 fs, bool learn_aqo,
extern void aqo_queries_flush(void);
extern void aqo_queries_load(void);

extern void check_dsa_file_size(void);
/*
* Machinery for deactivated queries cache.
* TODO: Should live in a custom memory context
Expand Down
76 changes: 76 additions & 0 deletions t/004_dsm_size_max.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use strict;
use warnings;

use Config;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;

use Test::More tests => 5;

my $node = PostgreSQL::Test::Cluster->new('aqotest');
$node->init;
$node->append_conf('postgresql.conf', qq{
shared_preload_libraries = 'aqo'
aqo.mode = 'learn'
log_statement = 'ddl'
aqo.join_threshold = 0
aqo.dsm_size_max = 4
aqo.fs_max_items = 30000
aqo.querytext_max_size = 1000000
});

# Disable connection default settings, forced by PGOPTIONS in AQO Makefile
$ENV{PGOPTIONS}="";

# General purpose variables.
my $long_string = 'a' x 1000000;

$node->start();
$node->psql('postgres', 'CREATE EXTENSION aqo;');

for my $i (1 .. 3) {
$node->psql('postgres', "select aqo_query_texts_update(" . $i . ", \'" . $long_string . "\');");
}
$node->stop();

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '1');
is($node->start(fail_ok => 1),
0, "node fails to start");

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '4');
is($node->start(),
1, "node starts");
$node->psql('postgres', 'select * from aqo_reset();');

$long_string = '1, ' x 10000;
for my $i (1 .. 30) {
$node->psql('postgres', "select aqo_data_update(" . $i . ", 1, 1, '{{1}}', '{1}', '{1}', '{" . $long_string . " 1}');");
}
$node->stop();

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '1');
is($node->start(fail_ok => 1),
0, "node fails to start");

$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '4');
is($node->start(),
1, "node starts");
$node->psql('postgres', 'select * from aqo_reset();');
$node->stop();

my $regex;
$long_string = 'a' x 100000;
$regex = qr/.*WARNING: \[AQO\] Not enough DSA\. AQO was disabled for this query/;
$node->adjust_conf('postgresql.conf', 'aqo.dsm_size_max', '1');
$node->start();
my ($stdout, $stderr);
for my $i (1 .. 20) {
$node->psql('postgres', "create table a as select s, md5(random()::text) from generate_Series(1,100) s;");
$node->psql('postgres',
"SELECT a.s FROM a CROSS JOIN ( SELECT '" . $long_string . "' as long_string) AS extra_rows;",
stdout => \$stdout, stderr => \$stderr);
$node->psql('postgres', "drop table a");
}
like($stderr, $regex, 'warning for exceeding the dsa limit');
$node->stop;
done_testing();
Loading