Skip to content

Commit

Permalink
add pts->num_scan_repeats to support multiple-scans by inner-buffer p…
Browse files Browse the repository at this point in the history
…artitions
  • Loading branch information
kaigai committed Sep 18, 2024
1 parent 8876593 commit ff687d7
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 125 deletions.
31 changes: 21 additions & 10 deletions src/arrow_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -3628,15 +3628,22 @@ ArrowBeginForeignScan(ForeignScanState *node, int eflags)
* ExecArrowScanChunk
*/
static inline RecordBatchState *
__arrowFdwNextRecordBatch(ArrowFdwState *arrow_state)
__arrowFdwNextRecordBatch(ArrowFdwState *arrow_state,
int32_t num_scan_repeats,
int32_t *p_scan_repeat_id)
{
RecordBatchState *rb_state;
uint32_t raw_index;
uint32_t rb_index;

Assert(num_scan_repeats > 0);
retry:
rb_index = pg_atomic_fetch_add_u32(arrow_state->rbatch_index, 1);
if (rb_index >= arrow_state->rb_nitems)
raw_index = pg_atomic_fetch_add_u32(arrow_state->rbatch_index, 1);
if (raw_index >= arrow_state->rb_nitems * num_scan_repeats)
return NULL; /* no more chunks to load */
rb_index = (raw_index % num_scan_repeats);
if (p_scan_repeat_id)
*p_scan_repeat_id = (raw_index / num_scan_repeats);
rb_state = arrow_state->rb_states[rb_index];
if (arrow_state->stats_hint)
{
Expand All @@ -3662,12 +3669,15 @@ pgstromScanChunkArrowFdw(pgstromTaskState *pts,
RecordBatchState *rb_state;
ArrowFileState *af_state;
strom_io_vector *iovec;
XpuCommand *xcmd;
uint32_t kds_src_offset;
uint32_t kds_src_iovec;
uint32_t kds_src_pathname;

rb_state = __arrowFdwNextRecordBatch(arrow_state);
XpuCommand *xcmd;
uint32_t kds_src_offset;
uint32_t kds_src_iovec;
uint32_t kds_src_pathname;
int32_t scan_repeat_id;

rb_state = __arrowFdwNextRecordBatch(arrow_state,
pts->num_scan_repeats,
&scan_repeat_id);
if (!rb_state)
{
pts->scan_done = true;
Expand Down Expand Up @@ -3704,6 +3714,7 @@ pgstromScanChunkArrowFdw(pgstromTaskState *pts,
xcmd->u.task.kds_src_pathname = kds_src_pathname;
xcmd->u.task.kds_src_iovec = kds_src_iovec;
xcmd->u.task.kds_src_offset = kds_src_offset;
xcmd->u.task.scan_repeat_id = scan_repeat_id;

xcmd_iov->iov_base = xcmd;
xcmd_iov->iov_len = xcmd->length;
Expand All @@ -3729,7 +3740,7 @@ ArrowIterateForeignScan(ForeignScanState *node)

arrow_state->curr_index = 0;
arrow_state->curr_kds = NULL;
rb_state = __arrowFdwNextRecordBatch(arrow_state);
rb_state = __arrowFdwNextRecordBatch(arrow_state, 1, NULL);
if (!rb_state)
return NULL;
arrow_state->curr_kds
Expand Down
47 changes: 11 additions & 36 deletions src/brin.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ typedef struct
volatile int build_status;
slock_t lock; /* once 'build_status' is set, no need to take
* this lock again. */
pg_atomic_uint32 index;
pg_atomic_uint64 index;
uint32_t nitems;
BlockNumber chunks[FLEXIBLE_ARRAY_MEMBER];
} BrinIndexResults;
Expand Down Expand Up @@ -567,7 +567,7 @@ pgstromBrinIndexExecReset(pgstromTaskState *pts)

br_results->build_status = 0;
br_results->nitems = 0;
pg_atomic_init_u32(&br_results->index, 0);
pg_atomic_init_u64(&br_results->index, 0);
}

/*
Expand Down Expand Up @@ -913,54 +913,29 @@ __BrinIndexGetResults(pgstromTaskState *pts)
return br_results;
}

TBMIterateResult *
pgstromBrinIndexNextBlock(pgstromTaskState *pts)
{
BrinIndexState *br_state = pts->br_state;
BrinIndexResults *br_results = __BrinIndexGetResults(pts);
uint32_t index;
BlockNumber blockno;

if (br_state->curr_block_id >= br_state->pagesPerRange)
{
index = pg_atomic_fetch_add_u32(&br_results->index, 1);
if (index >= br_results->nitems)
return NULL;
br_state->curr_chunk_id = br_results->chunks[index];
br_state->curr_block_id = 0;
}
blockno = (br_state->curr_chunk_id * br_state->pagesPerRange +
br_state->curr_block_id++);
if (blockno >= br_state->nblocks)
return NULL;

br_state->tbmres.blockno = blockno;
br_state->tbmres.ntuples = -1;
br_state->tbmres.recheck = true;
return &br_state->tbmres;
}

bool
int
pgstromBrinIndexNextChunk(pgstromTaskState *pts)
{
BrinIndexState *br_state = pts->br_state;
BrinIndexResults *br_results = __BrinIndexGetResults(pts);
uint32_t index;
uint64_t raw_index;

index = pg_atomic_fetch_add_u32(&br_results->index, 1);
if (index < br_results->nitems)
again:
raw_index = pg_atomic_fetch_add_u64(&br_results->index, 1);
if (raw_index < br_results->nitems * pts->num_scan_repeats)
{
BlockNumber pagesPerRange = br_state->pagesPerRange;
uint32_t index = (raw_index % br_results->nitems);

pts->curr_block_num = br_results->chunks[index] * pagesPerRange;
pts->curr_block_tail = pts->curr_block_num + pagesPerRange;
if (pts->curr_block_num >= br_state->nblocks)
return false;
goto again;
if (pts->curr_block_tail > br_state->nblocks)
pts->curr_block_tail = br_state->nblocks;
return true;
return (raw_index / br_results->nitems);
}
return false;
return -1;
}

void
Expand Down
4 changes: 4 additions & 0 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,11 @@ pgstromCreateTaskState(CustomScan *cscan,
pts->xpu_task_flags = pp_info->xpu_task_flags;
pts->pp_info = pp_info;
Assert(pp_info->num_rels == num_rels);
pts->num_scan_repeats = 1;
pts->num_rels = num_rels;
pts->curr_tbm = palloc0(offsetof(TBMIterateResult, offsets) +
sizeof(OffsetNumber) * MaxHeapTuplesPerPage);
pts->curr_repeat_id = -1;

return (Node *)pts;
}
Expand Down
14 changes: 12 additions & 2 deletions src/gpu_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,7 @@ GpuJoinInnerPreload(pgstromTaskState *pts)
{
pgstromTaskState *leader = pts;
pgstromSharedState *ps_state;
kern_buffer_partitions *kbuf_parts;
MemoryContext memcxt;

//pick up leader's ps_state if partitionwise plan
Expand Down Expand Up @@ -2677,8 +2678,17 @@ GpuJoinInnerPreload(pgstromTaskState *pts)
pts->ds_entry);
}

//TODO: send the shmem handle to the GPU server or DPU server

/*
* Inner-buffer partitioning often requires multiple outer-scan,
* if number of partitions is larger than the number of GPU devices.
*/
kbuf_parts = KERN_MULTIRELS_PARTITION_DESC(pts->h_kmrels, -1);
if (kbuf_parts)
{
pts->num_scan_repeats = (kbuf_parts->hash_divisor +
numGpuDevAttrs - 1) / numGpuDevAttrs;
assert(pts->num_scan_repeats > 0);
}
break;

default:
Expand Down
11 changes: 6 additions & 5 deletions src/pg_strom.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ struct pgstromTaskState
int64_t curr_index;
bool scan_done;
bool final_done;
uint32_t num_scan_repeats;

/* base relation scan, if any */
TupleTableSlot *base_slot;
Expand All @@ -474,16 +475,17 @@ struct pgstromTaskState
size_t fallback_usage;
size_t fallback_bufsz;
char *fallback_buffer;
TupleTableSlot *fallback_slot; /* host-side kvars-slot */
TupleTableSlot *fallback_slot; /* host-side kvars-slot */
List *fallback_proj;
List *fallback_load_src; /* source resno of base-rel */
List *fallback_load_dst; /* dest resno of fallback-slot */
bytea *kern_fallback_desc;
/* request command buffer (+ status for table scan) */
TBMIterateResult *curr_tbm;
int32_t curr_repeat_id; /* for KDS_FORMAT_ROW */
Buffer curr_vm_buffer; /* for visibility-map */
BlockNumber curr_block_num; /* for KDS_FORMAT_BLOCK */
BlockNumber curr_block_tail; /* for KDS_FORMAT_BLOCK */
uint64_t curr_block_num; /* for KDS_FORMAT_BLOCK */
uint64_t curr_block_tail; /* for KDS_FORMAT_BLOCK */
StringInfoData xcmd_buf;
/* callbacks */
TupleTableSlot *(*cb_next_tuple)(struct pgstromTaskState *pts);
Expand Down Expand Up @@ -631,8 +633,7 @@ extern void pgstromBrinIndexExecBegin(pgstromTaskState *pts,
Oid index_oid,
List *index_conds,
List *index_quals);
extern bool pgstromBrinIndexNextChunk(pgstromTaskState *pts);
extern TBMIterateResult *pgstromBrinIndexNextBlock(pgstromTaskState *pts);
extern int pgstromBrinIndexNextChunk(pgstromTaskState *pts);
extern void pgstromBrinIndexExecEnd(pgstromTaskState *pts);
extern void pgstromBrinIndexExecReset(pgstromTaskState *pts);
extern Size pgstromBrinIndexEstimateDSM(pgstromTaskState *pts);
Expand Down
Loading

0 comments on commit ff687d7

Please sign in to comment.