Skip to content

Commit

Permalink
arrow_fdw: add 'pattern' option
Browse files Browse the repository at this point in the history
issue #834
right now, it works just a filter of files.
  • Loading branch information
kaigai committed Oct 23, 2024
1 parent b53a0f5 commit 4cf5ef8
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 11 deletions.
5 changes: 4 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ include Makefile.cuda
#
# Installation Scripts
#
__STROM_SQL = pg_strom--4.0--5.0.sql pg_strom--5.0.sql pg_strom--5.0--5.1.sql
__STROM_SQL = pg_strom--4.0--5.0.sql \
pg_strom--5.0.sql \
pg_strom--5.0--5.1.sql \
pg_strom--5.1--5.3.sql
STROM_SQL = $(addprefix sql/,$(__STROM_SQL))

#
Expand Down
167 changes: 158 additions & 9 deletions src/arrow_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -2113,19 +2113,90 @@ GetOptimalDpuForArrowFdw(PlannerInfo *root, RelOptInfo *baserel)
return ds_entry;
}

/*
* arrowFdwExcludeFileNamesByPattern
*/
static List *
arrowFdwExcludeFileNamesByPattern(List *filesList,
const char *pattern,
List **p_filesAttrsList)
{
List *results = NIL; /* only valid files */
List *attrsList = NIL;
ListCell *lc;

foreach (lc, filesList)
{
String *path = lfirst(lc);
List *attrKeys = NIL;
List *attrValues = NIL;

if (pathNameMatchByPattern(strVal(path),
pattern,
&attrKeys,
&attrValues))
{
if (p_filesAttrsList)
{
ListCell *lc1, *lc2;
List *kv_list = NIL;

forboth (lc1, attrKeys,
lc2, attrValues)
{
const char *key = lfirst(lc1);
const char *value = lfirst(lc2);
int key_len = strlen(key);
int value_len = strlen(value);
char *pos;
ArrowKeyValue *kv;

kv = palloc(MAXALIGN(sizeof(ArrowKeyValue)) +
MAXALIGN(key_len+1) +
MAXALIGN(value_len+1));
__initArrowNode(&kv->node, ArrowNodeTag__KeyValue);
pos = ((char *)kv + MAXALIGN(sizeof(ArrowKeyValue)));
strcpy(pos, key);
kv->key = pos;
kv->_key_len = key_len;

pos += MAXALIGN(key_len+1);
strcpy(pos, value);
kv->value = pos;
kv->_value_len = value_len;

kv_list = lappend(kv_list, kv);
}
attrsList = lappend(attrsList, kv_list);
}
results = lappend(results, path);

list_free_deep(attrKeys);
list_free_deep(attrValues);
}
}
if (p_filesAttrsList)
{
Assert(list_length(results) == list_length(attrsList));
*p_filesAttrsList = attrsList;
}
return results;
}

/*
* arrowFdwExtractFilesList
*/
static List *
arrowFdwExtractFilesList(List *options_list,
List **p_filesAttrList,
int *p_parallel_nworkers)
{

ListCell *lc;
List *filesList = NIL;
char *dir_path = NULL;
char *dir_suffix = NULL;
char *pattern = NULL;
int parallel_nworkers = -1;
ListCell *lc;

foreach (lc, options_list)
{
Expand Down Expand Up @@ -2174,6 +2245,12 @@ arrowFdwExtractFilesList(List *options_list,
elog(ERROR, "'parallel_workers' appeared twice");
parallel_nworkers = atoi(strVal(defel->arg));
}
else if (strcmp(defel->defname, "pattern") == 0)
{
if (pattern)
elog(ERROR, "'pattern' appeared twice");
pattern = strVal(defel->arg);
}
else
elog(ERROR, "arrow: unknown option (%s)", defel->defname);
}
Expand Down Expand Up @@ -2209,7 +2286,21 @@ arrowFdwExtractFilesList(List *options_list,
}
FreeDir(dir);
}
/* exclude the file names by pattern */
if (pattern)
{
filesList = arrowFdwExcludeFileNamesByPattern(filesList, pattern,
p_filesAttrList);
}
else if (p_filesAttrList)
{
/* add empty file attributes list for forboth() macro */
List *filesAttrList = NIL;

foreach (lc, filesList)
filesAttrList = lappend(filesAttrList, NULL);
*p_filesAttrList = filesAttrList;
}
if (p_parallel_nworkers)
*p_parallel_nworkers = parallel_nworkers;
return filesList;
Expand Down Expand Up @@ -2633,7 +2724,8 @@ ArrowGetForeignRelSize(PlannerInfo *root,
referenced = pickup_outer_referenced(root, baserel, referenced);

/* read arrow-file metadta */
filesList = arrowFdwExtractFilesList(ft->options, &parallel_nworkers);
filesList = arrowFdwExtractFilesList(ft->options, NULL,
&parallel_nworkers);
foreach (lc1, filesList)
{
ArrowFileState *af_state;
Expand Down Expand Up @@ -3466,6 +3558,7 @@ __arrowFdwExecInit(ScanState *ss,
const DpuStorageEntry *ds_entry = NULL;
bool whole_row_ref = false;
List *filesList;
List *filesAttrList;
List *af_states_list = NIL;
uint32_t rb_nrooms = 0;
uint32_t rb_nitems = 0;
Expand All @@ -3488,7 +3581,7 @@ __arrowFdwExecInit(ScanState *ss,
}

/* setup ArrowFileState */
filesList = arrowFdwExtractFilesList(ft->options, NULL);
filesList = arrowFdwExtractFilesList(ft->options, &filesAttrList, NULL);
foreach (lc1, filesList)
{
char *fname = strVal(lfirst(lc1));
Expand Down Expand Up @@ -4117,14 +4210,16 @@ ArrowAcquireSampleRows(Relation relation,
double *p_totaldeadrows)
{
ForeignTable *ft = GetForeignTable(RelationGetRelid(relation));
List *filesList = arrowFdwExtractFilesList(ft->options, NULL);
List *filesList;
List *filesAttrList;
List *rb_state_list = NIL;
ListCell *lc1, *lc2;
int64 total_nrows = 0;
int64 count_nrows = 0;
int nsamples_min = nrooms / 100;
int nitems = 0;

filesList = arrowFdwExtractFilesList(ft->options, &filesAttrList, NULL);
foreach (lc1, filesList)
{
ArrowFileState *af_state;
Expand Down Expand Up @@ -4177,10 +4272,12 @@ ArrowAnalyzeForeignTable(Relation frel,
BlockNumber *p_totalpages)
{
ForeignTable *ft = GetForeignTable(RelationGetRelid(frel));
List *filesList = arrowFdwExtractFilesList(ft->options, NULL);
List *filesList;
List *filesAttrList;
ListCell *lc;
size_t totalpages = 0;

filesList = arrowFdwExtractFilesList(ft->options, &filesAttrList, NULL);
foreach (lc, filesList)
{
const char *fname = strVal(lfirst(lc));
Expand Down Expand Up @@ -4240,6 +4337,7 @@ ArrowImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
{
ArrowSchema schema;
List *filesList;
List *filesAttrList;
ListCell *lc;
const char **column_names;
StringInfoData cmd;
Expand All @@ -4259,7 +4357,7 @@ ArrowImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
elog(ERROR, "arrow_fdw: Bug? unknown list-type");
break;
}
filesList = arrowFdwExtractFilesList(stmt->options, NULL);
filesList = arrowFdwExtractFilesList(stmt->options, &filesAttrList, NULL);
if (filesList == NIL)
ereport(ERROR,
(errmsg("No valid apache arrow files are specified"),
Expand Down Expand Up @@ -4564,9 +4662,11 @@ pgstrom_arrow_fdw_validator(PG_FUNCTION_ARGS)

if (catalog == ForeignTableRelationId)
{
List *filesList = arrowFdwExtractFilesList(options, NULL);
List *filesList;
List *filesAttrList;
ListCell *lc;

filesList = arrowFdwExtractFilesList(options, &filesAttrList, NULL);
foreach (lc, filesList)
{
const char *fname = strVal(lfirst(lc));
Expand Down Expand Up @@ -4656,8 +4756,10 @@ pgstrom_arrow_fdw_precheck_schema(PG_FUNCTION_ARGS)
if (check_schema_compatibility)
{
ForeignTable *ft = GetForeignTable(RelationGetRelid(frel));
List *filesList = arrowFdwExtractFilesList(ft->options, NULL);
List *filesList;
List *filesAttrList;

filesList = arrowFdwExtractFilesList(ft->options, &filesAttrList, NULL);
foreach (lc, filesList)
{
const char *fname = strVal(lfirst(lc));
Expand All @@ -4670,6 +4772,53 @@ pgstrom_arrow_fdw_precheck_schema(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}

/*
* pgstrom_check_pattern
*/
PG_FUNCTION_INFO_V1(pgstrom_arrow_fdw_check_pattern);
PUBLIC_FUNCTION(Datum)
pgstrom_arrow_fdw_check_pattern(PG_FUNCTION_ARGS)
{
text *t = PG_GETARG_TEXT_P(0);
text *p = PG_GETARG_TEXT_P(1);
List *attrKeys = NIL;
List *attrValues = NIL;
ListCell *lc1, *lc2;
bool retval;
StringInfoData buf;

retval = pathNameMatchByPattern(text_to_cstring(t),
text_to_cstring(p),
&attrKeys,
&attrValues);
initStringInfo(&buf);
if (retval)
{
bool need_comma = false;

appendStringInfo(&buf, "true");
forboth (lc1, attrKeys,
lc2, attrValues)
{
if (!need_comma)
appendStringInfo(&buf, " {");
else
appendStringInfo(&buf, ", ");
appendStringInfo(&buf, "[%s]=[%s]",
(char *)lfirst(lc1),
(char *)lfirst(lc2));
need_comma = true;
}
if (need_comma)
appendStringInfo(&buf, "}");
}
else
{
appendStringInfo(&buf, "false");
}
PG_RETURN_TEXT_P(cstring_to_text(buf.data));
}

/*
* pgstrom_request_arrow_fdw
*/
Expand Down
Loading

0 comments on commit 4cf5ef8

Please sign in to comment.