From 3397377b2684363a614e73f681ec12b77de81581 Mon Sep 17 00:00:00 2001 From: KaiGai Kohei Date: Thu, 26 Sep 2024 00:32:12 +0900 Subject: [PATCH] initial workable large-tables join #3 still returns inconsistent results... --- src/gpu_join.c | 9 ++---- src/gpu_service.c | 76 +++++++++++++++++++++++++++++++++++++++-------- src/relscan.c | 31 ++++++++++++------- src/xpu_common.h | 2 -- 4 files changed, 85 insertions(+), 33 deletions(-) diff --git a/src/gpu_join.c b/src/gpu_join.c index 28328a18..d7308d79 100644 --- a/src/gpu_join.c +++ b/src/gpu_join.c @@ -2116,26 +2116,21 @@ innerPreloadSetupPinnedInnerBufferPartitions(kern_multirels *h_kmrels, int divisor = (largest_sz + partition_sz - 1) / partition_sz; size_t kbuf_parts_sz = MAXALIGN(offsetof(kern_buffer_partitions, parts[divisor])); - //TODO: Phase-3 allows divisor > numGPUs restructions - if (divisor > numGpuDevAttrs) - elog(ERROR, "pinned inner-buffer partitions divisor %d larger than number of GPU devices (%d) is not supported right now\n largest_sz=%zu largest_depth=%d", divisor, numGpuDevAttrs, largest_sz, largest_depth); if (h_kmrels) { kern_buffer_partitions *kbuf_parts = (kern_buffer_partitions *) ((char *)h_kmrels + offset); memset(kbuf_parts, 0, kbuf_parts_sz); - kbuf_parts->inner_depth = largest_depth; + kbuf_parts->inner_depth = largest_depth; kbuf_parts->hash_divisor = divisor; - kbuf_parts->remainder_head = 0; - kbuf_parts->remainder_tail = Min(divisor, numGpuDevAttrs); /* assign GPUs for each partition */ for (int base=0; base < divisor; base += numGpuDevAttrs) { gpumask_t optimal_gpus = pts->optimal_gpus; gpumask_t other_gpus = (GetSystemAvailableGpus() & ~optimal_gpus); int count = 0; - int unitsz = Min(divisor, numGpuDevAttrs); + int unitsz = Min(divisor-base, numGpuDevAttrs); while ((optimal_gpus | other_gpus) != 0) { diff --git a/src/gpu_service.c b/src/gpu_service.c index 56ba3438..9b5c161a 100644 --- a/src/gpu_service.c +++ b/src/gpu_service.c @@ -1576,7 +1576,6 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, kern_buffer_partitions *kbuf_parts) { uint32_t hash_divisor = kbuf_parts->hash_divisor; - size_t kds_length = kds_head->length; CUresult rc; struct timeval tv1, tv2, tv3, tv4; @@ -1585,11 +1584,11 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, { CUdeviceptr m_kds_in; - rc = allocGpuQueryBuffer(gq_buf, &m_kds_in, kds_length); + rc = allocGpuQueryBuffer(gq_buf, &m_kds_in, kds_head->length); if (rc != CUDA_SUCCESS) { gpuClientELog(gclient, "failed on allocGpuQueryBuffer(sz=%lu): %s", - kds_length, cuStrError(rc)); + kds_head->length, cuStrError(rc)); return false; } memcpy((void *)m_kds_in, kds_head, KDS_HEAD_LENGTH(kds_head)); @@ -1663,15 +1662,18 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, { kern_data_store *kds_in = kbuf_parts->parts[i].kds_in; gpumask_t available_gpus = kbuf_parts->parts[i].available_gpus; - + size_t head_sz = (KDS_HEAD_LENGTH(kds_in) + + sizeof(uint64_t) * kds_in->hash_nslots + + sizeof(uint64_t) * kds_in->nitems); /* * migrate the inner-buffer partitions to be used in the 2nd * or later repeats into the host-memory */ if (i > numGpuDevAttrs) { +#if 1 rc = cuMemPrefetchAsync((CUdeviceptr)kds_in, - kds_length, + head_sz, CU_DEVICE_CPU, MY_STREAM_PER_THREAD); if (rc != CUDA_SUCCESS) @@ -1680,6 +1682,19 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, cuStrError(rc)); return false; } + rc = cuMemPrefetchAsync((CUdeviceptr)kds_in + + kds_in->length + - kds_in->usage, + kds_in->usage, + CU_DEVICE_CPU, + MY_STREAM_PER_THREAD); + if (rc != CUDA_SUCCESS) + { + gpuClientELog(gclient, "failed on cuMemPrefetchAsync(CPU): %s", + cuStrError(rc)); + return false; + } +#endif continue; } /* @@ -1697,7 +1712,16 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, __FATAL("failed on cuCtxPushCurrent: %s", cuStrError(rc)); /* Prefetch the inner-buffer */ rc = cuMemPrefetchAsync((CUdeviceptr)kds_in, - kds_length, + head_sz, + gcontext->cuda_device, + MY_STREAM_PER_THREAD); + if (rc != CUDA_SUCCESS) + gpuClientELog(gclient, "failed on cuMemPrefetchAsync: %s", + cuStrError(rc)); + rc = cuMemPrefetchAsync((CUdeviceptr)kds_in + + kds_in->length + - kds_in->usage, + kds_in->usage, gcontext->cuda_device, MY_STREAM_PER_THREAD); if (rc != CUDA_SUCCESS) @@ -1721,10 +1745,24 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, { kern_data_store *kds_in = kbuf_parts->parts[i].kds_in; gpumask_t available_gpus = kbuf_parts->parts[i].available_gpus; - + size_t head_sz = (KDS_HEAD_LENGTH(kds_in) + + sizeof(uint64_t) * kds_in->hash_nslots + + sizeof(uint64_t) * kds_in->nitems); +#if 1 /* set read-only attribute */ rc = cuMemAdvise((CUdeviceptr)kds_in, - kds_length, + head_sz, + CU_MEM_ADVISE_SET_READ_MOSTLY, -1); + if (rc != CUDA_SUCCESS) + { + gpuClientELog(gclient, "failed on cuMemAdvise(SET_READ_MOSTLY): %s", + cuStrError(rc)); + return false; + } + rc = cuMemAdvise((CUdeviceptr)kds_in + + kds_in->length + - kds_in->usage, + kds_in->usage, CU_MEM_ADVISE_SET_READ_MOSTLY, -1); if (rc != CUDA_SUCCESS) { @@ -1732,6 +1770,7 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, cuStrError(rc)); return false; } +#endif /* make duplications if any */ if (i < numGpuDevAttrs) { @@ -1753,7 +1792,16 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient, __FATAL("failed on cuCtxPushCurrent: %s", cuStrError(rc)); /* Prefetch the inner-buffer */ rc = cuMemPrefetchAsync((CUdeviceptr)kds_in, - kds_length, + head_sz, + gcontext->cuda_device, + MY_STREAM_PER_THREAD); + if (rc != CUDA_SUCCESS) + gpuClientELog(gclient, "failed on cuMemPrefetchAsync: %s", + cuStrError(rc)); + rc = cuMemPrefetchAsync((CUdeviceptr)kds_in + + kds_in->length + - kds_in->usage, + kds_in->usage, gcontext->cuda_device, MY_STREAM_PER_THREAD); if (rc != CUDA_SUCCESS) @@ -3673,9 +3721,6 @@ gpuservHandleGpuTaskExec(gpuContext *gcontext, /* * Build GPU kernel execution plan, if pinned inner-buffer is * partitioned to multiple GPUs. - * - * - * */ if (gq_buf && gq_buf->h_kmrels) { @@ -3685,11 +3730,16 @@ gpuservHandleGpuTaskExec(gpuContext *gcontext, if (kbuf_parts) { + int32_t repeat_id = xcmd->u.task.scan_repeat_id; + int32_t start = repeat_id * numGpuDevAttrs; + int32_t end = Min(start + numGpuDevAttrs, + kbuf_parts->hash_divisor); part_divisor = kbuf_parts->hash_divisor; part_gcontexts = alloca(sizeof(gpuContext *) * part_divisor); part_reminders = alloca(sizeof(uint32_t) * part_divisor); - for (int k=0; k < part_divisor; k++) + assert(start < end); + for (int k=start; k < end; k++) { gpumask_t part_mask = kbuf_parts->parts[k].available_gpus; diff --git a/src/relscan.c b/src/relscan.c index 13bc3c08..af0ad074 100644 --- a/src/relscan.c +++ b/src/relscan.c @@ -550,7 +550,6 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, { pgstromSharedState *ps_state = pts->ps_state; Relation relation = pts->css.ss.ss_currentRelation; - HeapScanDesc h_scan = (HeapScanDesc)pts->css.ss.ss_currentScanDesc; SMgrRelation smgr = RelationGetSmgr(relation); XpuCommand *xcmd; kern_data_store *kds; @@ -589,7 +588,8 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, kds->nitems < kds_nrooms) { BlockNumber block_num - = (pts->curr_block_num + h_scan->rs_startblock) % h_scan->rs_nblocks; + = (pts->curr_block_num + + ps_state->scan_block_start) % ps_state->scan_block_nums; /* * MEMO: Usually, CPU is (much) more powerful than DPUs. * In case when the source cache is already on the shared- @@ -626,8 +626,8 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, * because it shall be JOIN'ed on different partitions. */ if (scan_repeat_id < 0) - scan_repeat_id = pts->curr_block_num / h_scan->rs_nblocks; - else if (scan_repeat_id != pts->curr_block_num / h_scan->rs_nblocks) + scan_repeat_id = pts->curr_block_num / ps_state->scan_block_nums; + else if (scan_repeat_id != pts->curr_block_num / ps_state->scan_block_nums) goto out; /* @@ -712,16 +712,18 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, uint32_t num_blocks = kds_nrooms - kds->nitems; uint64_t scan_block_limit = (ps_state->scan_block_nums * pts->num_scan_repeats); + uint64_t scan_block_count; - pts->curr_block_num = pg_atomic_fetch_add_u64(&ps_state->scan_block_count, - num_blocks); - if (pts->curr_block_num >= scan_block_limit) + scan_block_count = pg_atomic_fetch_add_u64(&ps_state->scan_block_count, + num_blocks); + if (scan_block_count >= scan_block_limit) pts->scan_done = true; else { - if (pts->curr_block_num + num_blocks > scan_block_limit) - num_blocks = h_scan->rs_nblocks - pts->curr_block_num; + pts->curr_block_num = (scan_block_count % ps_state->scan_block_nums); pts->curr_block_tail = pts->curr_block_num + num_blocks; + if (pts->curr_block_tail > ps_state->scan_block_nums) + pts->curr_block_tail = ps_state->scan_block_nums; } } } @@ -735,9 +737,16 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts, Assert(scan_repeat_id >= 0); /* XXX - debug message */ if (scan_repeat_id > 0 && scan_repeat_id != pts->last_repeat_id) - elog(NOTICE, "direct scan on '%s' moved into %dth loop for inner-buffer partitions (pid: %u)", + elog(NOTICE, "direct scan on '%s' moved into %dth loop for inner-buffer partitions (pid: %u) scan_block_count=%lu scan_block_nums=%u scan_block_start=%u num_scan_repeats=%u curr_repeat_id=%d curr_block_num=%lu curr_block_tail=%lu", RelationGetRelationName(pts->css.ss.ss_currentRelation), - scan_repeat_id+1, MyProcPid); + scan_repeat_id+1, MyProcPid, + pg_atomic_read_u64(&ps_state->scan_block_count), + ps_state->scan_block_nums, + ps_state->scan_block_start, + pts->num_scan_repeats, + pts->curr_repeat_id, + pts->curr_block_num, + pts->curr_block_tail); pts->last_repeat_id = scan_repeat_id; if (strom_nblocks > 0) diff --git a/src/xpu_common.h b/src/xpu_common.h index 843d4bc4..c4ce1001 100644 --- a/src/xpu_common.h +++ b/src/xpu_common.h @@ -3050,8 +3050,6 @@ typedef struct { int32_t inner_depth; /* partitioned depth */ int32_t hash_divisor; /* divisor for the hash-value */ - int32_t remainder_head; /* smallest remainder to be processed */ - int32_t remainder_tail; /* largest remainder to be processed */ struct { gpumask_t available_gpus; /* set of GPUs for this partition */ kern_data_store *kds_in; /* used by GPU-service */