From ff687d74fe952d87da68cb746d465da9e4f51ce8 Mon Sep 17 00:00:00 2001 From: KaiGai Kohei Date: Thu, 19 Sep 2024 00:05:42 +0900 Subject: [PATCH] add pts->num_scan_repeats to support multiple-scans by inner-buffer partitions --- src/arrow_fdw.c | 31 +++++-- src/brin.c | 47 +++------- src/executor.c | 4 + src/gpu_join.c | 14 ++- src/pg_strom.h | 11 ++- src/relscan.c | 232 ++++++++++++++++++++++++++++++++--------------- src/xpu_common.h | 1 + 7 files changed, 215 insertions(+), 125 deletions(-) diff --git a/src/arrow_fdw.c b/src/arrow_fdw.c index cbbee7c0..af1b5437 100644 --- a/src/arrow_fdw.c +++ b/src/arrow_fdw.c @@ -3628,15 +3628,22 @@ ArrowBeginForeignScan(ForeignScanState *node, int eflags) * ExecArrowScanChunk */ static inline RecordBatchState * -__arrowFdwNextRecordBatch(ArrowFdwState *arrow_state) +__arrowFdwNextRecordBatch(ArrowFdwState *arrow_state, + int32_t num_scan_repeats, + int32_t *p_scan_repeat_id) { RecordBatchState *rb_state; + uint32_t raw_index; uint32_t rb_index; + Assert(num_scan_repeats > 0); retry: - rb_index = pg_atomic_fetch_add_u32(arrow_state->rbatch_index, 1); - if (rb_index >= arrow_state->rb_nitems) + raw_index = pg_atomic_fetch_add_u32(arrow_state->rbatch_index, 1); + if (raw_index >= arrow_state->rb_nitems * num_scan_repeats) return NULL; /* no more chunks to load */ + rb_index = (raw_index % num_scan_repeats); + if (p_scan_repeat_id) + *p_scan_repeat_id = (raw_index / num_scan_repeats); rb_state = arrow_state->rb_states[rb_index]; if (arrow_state->stats_hint) { @@ -3662,12 +3669,15 @@ pgstromScanChunkArrowFdw(pgstromTaskState *pts, RecordBatchState *rb_state; ArrowFileState *af_state; strom_io_vector *iovec; - XpuCommand *xcmd; - uint32_t kds_src_offset; - uint32_t kds_src_iovec; - uint32_t kds_src_pathname; - - rb_state = __arrowFdwNextRecordBatch(arrow_state); + XpuCommand *xcmd; + uint32_t kds_src_offset; + uint32_t kds_src_iovec; + uint32_t kds_src_pathname; + int32_t scan_repeat_id; + + rb_state = __arrowFdwNextRecordBatch(arrow_state, + pts->num_scan_repeats, + &scan_repeat_id); if (!rb_state) { pts->scan_done = true; @@ -3704,6 +3714,7 @@ pgstromScanChunkArrowFdw(pgstromTaskState *pts, xcmd->u.task.kds_src_pathname = kds_src_pathname; xcmd->u.task.kds_src_iovec = kds_src_iovec; xcmd->u.task.kds_src_offset = kds_src_offset; + xcmd->u.task.scan_repeat_id = scan_repeat_id; xcmd_iov->iov_base = xcmd; xcmd_iov->iov_len = xcmd->length; @@ -3729,7 +3740,7 @@ ArrowIterateForeignScan(ForeignScanState *node) arrow_state->curr_index = 0; arrow_state->curr_kds = NULL; - rb_state = __arrowFdwNextRecordBatch(arrow_state); + rb_state = __arrowFdwNextRecordBatch(arrow_state, 1, NULL); if (!rb_state) return NULL; arrow_state->curr_kds diff --git a/src/brin.c b/src/brin.c index 420e88f2..a0b492fe 100644 --- a/src/brin.c +++ b/src/brin.c @@ -449,7 +449,7 @@ typedef struct volatile int build_status; slock_t lock; /* once 'build_status' is set, no need to take * this lock again. */ - pg_atomic_uint32 index; + pg_atomic_uint64 index; uint32_t nitems; BlockNumber chunks[FLEXIBLE_ARRAY_MEMBER]; } BrinIndexResults; @@ -567,7 +567,7 @@ pgstromBrinIndexExecReset(pgstromTaskState *pts) br_results->build_status = 0; br_results->nitems = 0; - pg_atomic_init_u32(&br_results->index, 0); + pg_atomic_init_u64(&br_results->index, 0); } /* @@ -913,54 +913,29 @@ __BrinIndexGetResults(pgstromTaskState *pts) return br_results; } -TBMIterateResult * -pgstromBrinIndexNextBlock(pgstromTaskState *pts) -{ - BrinIndexState *br_state = pts->br_state; - BrinIndexResults *br_results = __BrinIndexGetResults(pts); - uint32_t index; - BlockNumber blockno; - - if (br_state->curr_block_id >= br_state->pagesPerRange) - { - index = pg_atomic_fetch_add_u32(&br_results->index, 1); - if (index >= br_results->nitems) - return NULL; - br_state->curr_chunk_id = br_results->chunks[index]; - br_state->curr_block_id = 0; - } - blockno = (br_state->curr_chunk_id * br_state->pagesPerRange + - br_state->curr_block_id++); - if (blockno >= br_state->nblocks) - return NULL; - - br_state->tbmres.blockno = blockno; - br_state->tbmres.ntuples = -1; - br_state->tbmres.recheck = true; - return &br_state->tbmres; -} - -bool +int pgstromBrinIndexNextChunk(pgstromTaskState *pts) { BrinIndexState *br_state = pts->br_state; BrinIndexResults *br_results = __BrinIndexGetResults(pts); - uint32_t index; + uint64_t raw_index; - index = pg_atomic_fetch_add_u32(&br_results->index, 1); - if (index < br_results->nitems) +again: + raw_index = pg_atomic_fetch_add_u64(&br_results->index, 1); + if (raw_index < br_results->nitems * pts->num_scan_repeats) { BlockNumber pagesPerRange = br_state->pagesPerRange; + uint32_t index = (raw_index % br_results->nitems); pts->curr_block_num = br_results->chunks[index] * pagesPerRange; pts->curr_block_tail = pts->curr_block_num + pagesPerRange; if (pts->curr_block_num >= br_state->nblocks) - return false; + goto again; if (pts->curr_block_tail > br_state->nblocks) pts->curr_block_tail = br_state->nblocks; - return true; + return (raw_index / br_results->nitems); } - return false; + return -1; } void diff --git a/src/executor.c b/src/executor.c index 2ff788b5..02424d7a 100644 --- a/src/executor.c +++ b/src/executor.c @@ -1406,7 +1406,11 @@ pgstromCreateTaskState(CustomScan *cscan, pts->xpu_task_flags = pp_info->xpu_task_flags; pts->pp_info = pp_info; Assert(pp_info->num_rels == num_rels); + pts->num_scan_repeats = 1; pts->num_rels = num_rels; + pts->curr_tbm = palloc0(offsetof(TBMIterateResult, offsets) + + sizeof(OffsetNumber) * MaxHeapTuplesPerPage); + pts->curr_repeat_id = -1; return (Node *)pts; } diff --git a/src/gpu_join.c b/src/gpu_join.c index 35adc8f3..05a4bf28 100644 --- a/src/gpu_join.c +++ b/src/gpu_join.c @@ -2470,6 +2470,7 @@ GpuJoinInnerPreload(pgstromTaskState *pts) { pgstromTaskState *leader = pts; pgstromSharedState *ps_state; + kern_buffer_partitions *kbuf_parts; MemoryContext memcxt; //pick up leader's ps_state if partitionwise plan @@ -2677,8 +2678,17 @@ GpuJoinInnerPreload(pgstromTaskState *pts) pts->ds_entry); } - //TODO: send the shmem handle to the GPU server or DPU server - + /* + * Inner-buffer partitioning often requires multiple outer-scan, + * if number of partitions is larger than the number of GPU devices. + */ + kbuf_parts = KERN_MULTIRELS_PARTITION_DESC(pts->h_kmrels, -1); + if (kbuf_parts) + { + pts->num_scan_repeats = (kbuf_parts->hash_divisor + + numGpuDevAttrs - 1) / numGpuDevAttrs; + assert(pts->num_scan_repeats > 0); + } break; default: diff --git a/src/pg_strom.h b/src/pg_strom.h index d88452f0..a01e5622 100644 --- a/src/pg_strom.h +++ b/src/pg_strom.h @@ -462,6 +462,7 @@ struct pgstromTaskState int64_t curr_index; bool scan_done; bool final_done; + uint32_t num_scan_repeats; /* base relation scan, if any */ TupleTableSlot *base_slot; @@ -474,16 +475,17 @@ struct pgstromTaskState size_t fallback_usage; size_t fallback_bufsz; char *fallback_buffer; - TupleTableSlot *fallback_slot; /* host-side kvars-slot */ + TupleTableSlot *fallback_slot; /* host-side kvars-slot */ List *fallback_proj; List *fallback_load_src; /* source resno of base-rel */ List *fallback_load_dst; /* dest resno of fallback-slot */ bytea *kern_fallback_desc; /* request command buffer (+ status for table scan) */ TBMIterateResult *curr_tbm; + int32_t curr_repeat_id; /* for KDS_FORMAT_ROW */ Buffer curr_vm_buffer; /* for visibility-map */ - BlockNumber curr_block_num; /* for KDS_FORMAT_BLOCK */ - BlockNumber curr_block_tail; /* for KDS_FORMAT_BLOCK */ + uint64_t curr_block_num; /* for KDS_FORMAT_BLOCK */ + uint64_t curr_block_tail; /* for KDS_FORMAT_BLOCK */ StringInfoData xcmd_buf; /* callbacks */ TupleTableSlot *(*cb_next_tuple)(struct pgstromTaskState *pts); @@ -631,8 +633,7 @@ extern void pgstromBrinIndexExecBegin(pgstromTaskState *pts, Oid index_oid, List *index_conds, List *index_quals); -extern bool pgstromBrinIndexNextChunk(pgstromTaskState *pts); -extern TBMIterateResult *pgstromBrinIndexNextBlock(pgstromTaskState *pts); +extern int pgstromBrinIndexNextChunk(pgstromTaskState *pts); extern void pgstromBrinIndexExecEnd(pgstromTaskState *pts); extern void pgstromBrinIndexExecReset(pgstromTaskState *pts); extern Size pgstromBrinIndexEstimateDSM(pgstromTaskState *pts); diff --git a/src/relscan.c b/src/relscan.c index 5c45166a..0c124fb9 100644 --- a/src/relscan.c +++ b/src/relscan.c @@ -365,6 +365,44 @@ estimate_kern_data_store(TupleDesc tupdesc) * * ---------------------------------------------------------------- */ +static inline void +__relScanInitStartBlock(pgstromTaskState *pts) +{ + Relation relation = pts->css.ss.ss_currentRelation; + HeapScanDesc h_scan = (HeapScanDesc)pts->css.ss.ss_currentScanDesc; + + if (!h_scan->rs_inited) + { + if (h_scan->rs_base.rs_parallel) + { + /* see table_block_parallelscan_startblock_init */ + ParallelBlockTableScanDesc pb_scan = + (ParallelBlockTableScanDesc)h_scan->rs_base.rs_parallel; + BlockNumber start_block = InvalidBlockNumber; + retry_parallel_init: + SpinLockAcquire(&pb_scan->phs_mutex); + if (pb_scan->phs_startblock == InvalidBlockNumber) + { + if (!pb_scan->base.phs_syncscan) + pb_scan->phs_startblock = 0; + else if (start_block != InvalidBlockNumber) + pb_scan->phs_startblock = start_block; + else + { + SpinLockRelease(&pb_scan->phs_mutex); + start_block = ss_get_location(relation, pb_scan->phs_nblocks); + Assert(start_block != InvalidBlockNumber); + goto retry_parallel_init; + } + } + h_scan->rs_nblocks = pb_scan->phs_nblocks; + h_scan->rs_startblock = pb_scan->phs_startblock; + SpinLockRelease(&pb_scan->phs_mutex); + } + h_scan->rs_inited = true; + } +} + #define __XCMD_KDS_SRC_OFFSET(buf) \ (((XpuCommand *)((buf)->data))->u.task.kds_src_offset) #define __XCMD_GET_KDS_SRC(buf) \ @@ -563,6 +601,10 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, uint32_t kds_src_pathname = 0; uint32_t kds_src_iovec = 0; uint32_t kds_nrooms; + int32_t scan_repeat_id = -1; + uint64_t scan_block_limit; + + __relScanInitStartBlock(pts); kds = __XCMD_GET_KDS_SRC(&pts->xcmd_buf); kds_nrooms = (PGSTROM_CHUNK_SIZE - @@ -581,6 +623,9 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, strom_iovec->nr_chunks = 0; strom_blknums = alloca(sizeof(BlockNumber) * kds_nrooms); strom_nblocks = 0; + + scan_block_limit = ((uint64_t)h_scan->rs_nblocks * + (uint64_t)pts->num_scan_repeats); while (!pts->scan_done) { while (pts->curr_block_num < pts->curr_block_tail && @@ -617,7 +662,17 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, } LWLockRelease(bufLock); } - + + /* + * MEMO: When multiple scans are needed (pts->num_scan_repeats > 0), + * kds_src should not mixture the blocks come from different scans, + * because it shall be JOIN'ed on different partitions. + */ + if (scan_repeat_id < 0) + scan_repeat_id = pts->curr_block_num / h_scan->rs_nblocks; + else if (scan_repeat_id != pts->curr_block_num / h_scan->rs_nblocks) + goto out; + /* * MEMO: right now, we allow GPU Direct SQL for the all-visible * pages only, due to the restrictions about MVCC checks. @@ -685,25 +740,23 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, } else if (pts->br_state) { - if (!pgstromBrinIndexNextChunk(pts)) + int __next_repeat_id = pgstromBrinIndexNextChunk(pts); + + if (__next_repeat_id < 0) pts->scan_done = true; + else if (scan_repeat_id < 0 || + scan_repeat_id != __next_repeat_id) + break; } else if (!h_scan->rs_base.rs_parallel) { /* single process scan */ BlockNumber num_blocks = kds_nrooms - kds->nitems; - if (!h_scan->rs_inited) - { - h_scan->rs_cblock = 0; - h_scan->rs_inited = true; - } - pts->curr_block_num = h_scan->rs_cblock; - if (pts->curr_block_num >= h_scan->rs_nblocks) + if (pts->curr_block_num >= scan_block_limit) pts->scan_done = true; - else if (pts->curr_block_num + num_blocks > h_scan->rs_nblocks) + else if (pts->curr_block_num + num_blocks > scan_block_limit) num_blocks = h_scan->rs_nblocks - pts->curr_block_num; - h_scan->rs_cblock += num_blocks; pts->curr_block_tail = pts->curr_block_num + num_blocks; } else @@ -713,36 +766,11 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, (ParallelBlockTableScanDesc)h_scan->rs_base.rs_parallel; BlockNumber num_blocks = kds_nrooms - kds->nitems; - if (!h_scan->rs_inited) - { - /* see table_block_parallelscan_startblock_init */ - BlockNumber start_block = InvalidBlockNumber; - - retry_parallel_init: - SpinLockAcquire(&pb_scan->phs_mutex); - if (pb_scan->phs_startblock == InvalidBlockNumber) - { - if (!pb_scan->base.phs_syncscan) - pb_scan->phs_startblock = 0; - else if (start_block != InvalidBlockNumber) - pb_scan->phs_startblock = start_block; - else - { - SpinLockRelease(&pb_scan->phs_mutex); - start_block = ss_get_location(relation, pb_scan->phs_nblocks); - goto retry_parallel_init; - } - } - h_scan->rs_nblocks = pb_scan->phs_nblocks; - h_scan->rs_startblock = pb_scan->phs_startblock; - SpinLockRelease(&pb_scan->phs_mutex); - h_scan->rs_inited = true; - } pts->curr_block_num = pg_atomic_fetch_add_u64(&pb_scan->phs_nallocated, num_blocks); - if (pts->curr_block_num >= h_scan->rs_nblocks) + if (pts->curr_block_num >= scan_block_limit) pts->scan_done = true; - else if (pts->curr_block_num + num_blocks > h_scan->rs_nblocks) + else if (pts->curr_block_num + num_blocks > scan_block_limit) num_blocks = h_scan->rs_nblocks - pts->curr_block_num; pts->curr_block_tail = pts->curr_block_num + num_blocks; } @@ -754,6 +782,7 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, kds->length = kds->block_offset + BLCKSZ * kds->nitems; if (kds->nitems == 0) return NULL; + Assert(scan_repeat_id >= 0); if (strom_nblocks > 0) { memcpy(&KDS_BLOCK_BLCKNR(kds, kds->block_nloaded), @@ -783,6 +812,7 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, xcmd = (XpuCommand *)pts->xcmd_buf.data; xcmd->u.task.kds_src_pathname = kds_src_pathname; xcmd->u.task.kds_src_iovec = kds_src_iovec; + xcmd->u.task.scan_repeat_id = scan_repeat_id; xcmd->length = pts->xcmd_buf.len; xcmd_iov[0].iov_base = xcmd; @@ -828,13 +858,15 @@ XpuCommand * pgstromRelScanChunkNormal(pgstromTaskState *pts, struct iovec *xcmd_iov, int *xcmd_iovcnt) { - EState *estate = pts->css.ss.ps.state; - TableScanDesc scan = pts->css.ss.ss_currentScanDesc; + HeapScanDesc h_scan = (HeapScanDesc)pts->css.ss.ss_currentScanDesc; TupleTableSlot *slot = pts->base_slot; kern_data_store *kds; XpuCommand *xcmd; + int kds_repeat_id = -1; size_t sz1, sz2; + __relScanInitStartBlock(pts); + pts->xcmd_buf.len = __XCMD_KDS_SRC_OFFSET(&pts->xcmd_buf) + PGSTROM_CHUNK_SIZE; enlargeStringInfo(&pts->xcmd_buf, 0); kds = __XCMD_GET_KDS_SRC(&pts->xcmd_buf); @@ -842,53 +874,108 @@ pgstromRelScanChunkNormal(pgstromTaskState *pts, kds->usage = 0; kds->length = PGSTROM_CHUNK_SIZE; - if (pts->br_state) + while (!pts->scan_done) { - /* scan by BRIN index */ - while (!pts->scan_done) + if (!TTS_EMPTY(slot)) + { + Assert(pts->curr_repeat_id >= 0); + if (!__kds_row_insert_tuple(kds, slot)) + break; + if (kds_repeat_id < 0) + kds_repeat_id = pts->curr_repeat_id; + } + Assert(TTS_EMPTY(slot)); + + if (pts->curr_repeat_id < 0) { - if (!pts->curr_tbm) + /* scan by BRIN index */ + if (pts->br_state) { - TBMIterateResult *next_tbm = pgstromBrinIndexNextBlock(pts); + TBMIterateResult *tbm = pts->curr_tbm; + uint64_t raw_index = pts->curr_block_num++; - if (!next_tbm) + if (raw_index >= pts->curr_block_tail) + { + int __repeat_id = pgstromBrinIndexNextChunk(pts); + + if (__repeat_id < 0) + { + pts->scan_done = true; + break; + } + raw_index = pts->curr_block_num++; + } + tbm->blockno = (raw_index + h_scan->rs_startblock) % h_scan->rs_nblocks; + tbm->ntuples = -1; + tbm->recheck = true; + if (!table_scan_bitmap_next_block(&h_scan->rs_base, tbm)) + continue; + pts->curr_repeat_id = (raw_index / h_scan->rs_nblocks); + } + else if (!h_scan->rs_base.rs_parallel) + { + /* single process scan */ + uint64_t raw_index = pts->curr_block_num++; + + if (raw_index < ((uint64_t)h_scan->rs_nblocks * + (uint64_t)pts->num_scan_repeats)) + { + TBMIterateResult *tbm = pts->curr_tbm; + + tbm->blockno = (raw_index % h_scan->rs_nblocks); + tbm->ntuples = -1; + tbm->recheck = true; + if (!table_scan_bitmap_next_block(&h_scan->rs_base, tbm)) + continue; + pts->curr_repeat_id = (raw_index / h_scan->rs_nblocks); + } + else { pts->scan_done = true; break; } - if (!table_scan_bitmap_next_block(scan, next_tbm)) - elog(ERROR, "failed on table_scan_bitmap_next_block"); - pts->curr_tbm = next_tbm; } - if (!TTS_EMPTY(slot) && - !__kds_row_insert_tuple(kds, slot)) - break; - if (!table_scan_bitmap_next_tuple(scan, pts->curr_tbm, slot)) - pts->curr_tbm = NULL; - else if (!__kds_row_insert_tuple(kds, slot)) - break; - } - } - else - { - /* full table scan */ - while (!pts->scan_done) - { - if (!TTS_EMPTY(slot) && - !__kds_row_insert_tuple(kds, slot)) - break; - if (!table_scan_getnextslot(scan, estate->es_direction, slot)) + else { - pts->scan_done = true; - break; + /* parallel process scan */ + ParallelBlockTableScanDesc pb_scan = + (ParallelBlockTableScanDesc)h_scan->rs_base.rs_parallel; + uint64_t raw_index = pg_atomic_fetch_add_u64(&pb_scan->phs_nallocated, 1); + + if (raw_index < ((uint64_t)h_scan->rs_nblocks * + (uint64_t)pts->num_scan_repeats)) + { + TBMIterateResult *tbm = pts->curr_tbm; + + tbm->blockno = (raw_index % h_scan->rs_nblocks); + tbm->ntuples = -1; + tbm->recheck = true; + if (!table_scan_bitmap_next_block(&h_scan->rs_base, tbm)) + continue; + pts->curr_repeat_id = (raw_index / h_scan->rs_nblocks); + } + else + { + pts->scan_done = true; + break; + } } - if (!__kds_row_insert_tuple(kds, slot)) - break; } + Assert(pts->curr_repeat_id >= 0); + if (kds_repeat_id >= 0 && kds_repeat_id != pts->curr_repeat_id) + break; /* never mixture different repeat_id in same KDS */ + if (!table_scan_bitmap_next_tuple(&h_scan->rs_base, + pts->curr_tbm, + slot)) + pts->curr_repeat_id = -1; /* load next */ + else if (!__kds_row_insert_tuple(kds, slot)) + break; /* KDS fill up */ + else if (kds_repeat_id < 0) + kds_repeat_id = pts->curr_repeat_id; } - if (kds->nitems == 0) return NULL; + Assert(kds_repeat_id >= 0); /* setup iovec that may skip the hole between row-index and tuples-buffer */ sz1 = ((KDS_BODY_ADDR(kds) - pts->xcmd_buf.data) + @@ -898,6 +985,7 @@ pgstromRelScanChunkNormal(pgstromTaskState *pts, kds->length = (KDS_HEAD_LENGTH(kds) + MAXALIGN(sizeof(uint64_t) * kds->nitems) + sz2); xcmd = (XpuCommand *)pts->xcmd_buf.data; + xcmd->u.task.scan_repeat_id = kds_repeat_id; xcmd->length = sz1 + sz2; xcmd_iov[0].iov_base = xcmd; xcmd_iov[0].iov_len = sz1; diff --git a/src/xpu_common.h b/src/xpu_common.h index f76b135d..843d4bc4 100644 --- a/src/xpu_common.h +++ b/src/xpu_common.h @@ -2505,6 +2505,7 @@ typedef struct { uint32_t kds_src_iovec; /* offset to strom_io_vector */ uint32_t kds_src_offset; /* offset to kds_src */ uint32_t kds_dst_offset; /* offset to kds_dst */ + int32_t scan_repeat_id; /* current repeat count */ char data[1] __MAXALIGNED__; } kern_exec_task;