Skip to content

Commit

Permalink
initial workable large-tables join #3
Browse files Browse the repository at this point in the history
still returns inconsistent results...
  • Loading branch information
kaigai committed Sep 25, 2024
1 parent 697ef24 commit 3397377
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 33 deletions.
9 changes: 2 additions & 7 deletions src/gpu_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -2116,26 +2116,21 @@ innerPreloadSetupPinnedInnerBufferPartitions(kern_multirels *h_kmrels,
int divisor = (largest_sz + partition_sz - 1) / partition_sz;
size_t kbuf_parts_sz = MAXALIGN(offsetof(kern_buffer_partitions,
parts[divisor]));
//TODO: Phase-3 allows divisor > numGPUs restructions
if (divisor > numGpuDevAttrs)
elog(ERROR, "pinned inner-buffer partitions divisor %d larger than number of GPU devices (%d) is not supported right now\n largest_sz=%zu largest_depth=%d", divisor, numGpuDevAttrs, largest_sz, largest_depth);
if (h_kmrels)
{
kern_buffer_partitions *kbuf_parts = (kern_buffer_partitions *)
((char *)h_kmrels + offset);

memset(kbuf_parts, 0, kbuf_parts_sz);
kbuf_parts->inner_depth = largest_depth;
kbuf_parts->inner_depth = largest_depth;
kbuf_parts->hash_divisor = divisor;
kbuf_parts->remainder_head = 0;
kbuf_parts->remainder_tail = Min(divisor, numGpuDevAttrs);
/* assign GPUs for each partition */
for (int base=0; base < divisor; base += numGpuDevAttrs)
{
gpumask_t optimal_gpus = pts->optimal_gpus;
gpumask_t other_gpus = (GetSystemAvailableGpus() & ~optimal_gpus);
int count = 0;
int unitsz = Min(divisor, numGpuDevAttrs);
int unitsz = Min(divisor-base, numGpuDevAttrs);

while ((optimal_gpus | other_gpus) != 0)
{
Expand Down
76 changes: 63 additions & 13 deletions src/gpu_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,6 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
kern_buffer_partitions *kbuf_parts)
{
uint32_t hash_divisor = kbuf_parts->hash_divisor;
size_t kds_length = kds_head->length;
CUresult rc;
struct timeval tv1, tv2, tv3, tv4;

Expand All @@ -1585,11 +1584,11 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
{
CUdeviceptr m_kds_in;

rc = allocGpuQueryBuffer(gq_buf, &m_kds_in, kds_length);
rc = allocGpuQueryBuffer(gq_buf, &m_kds_in, kds_head->length);
if (rc != CUDA_SUCCESS)
{
gpuClientELog(gclient, "failed on allocGpuQueryBuffer(sz=%lu): %s",
kds_length, cuStrError(rc));
kds_head->length, cuStrError(rc));
return false;
}
memcpy((void *)m_kds_in, kds_head, KDS_HEAD_LENGTH(kds_head));
Expand Down Expand Up @@ -1663,15 +1662,18 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
{
kern_data_store *kds_in = kbuf_parts->parts[i].kds_in;
gpumask_t available_gpus = kbuf_parts->parts[i].available_gpus;

size_t head_sz = (KDS_HEAD_LENGTH(kds_in) +
sizeof(uint64_t) * kds_in->hash_nslots +
sizeof(uint64_t) * kds_in->nitems);
/*
* migrate the inner-buffer partitions to be used in the 2nd
* or later repeats into the host-memory
*/
if (i > numGpuDevAttrs)
{
#if 1
rc = cuMemPrefetchAsync((CUdeviceptr)kds_in,
kds_length,
head_sz,
CU_DEVICE_CPU,
MY_STREAM_PER_THREAD);
if (rc != CUDA_SUCCESS)
Expand All @@ -1680,6 +1682,19 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
cuStrError(rc));
return false;
}
rc = cuMemPrefetchAsync((CUdeviceptr)kds_in
+ kds_in->length
- kds_in->usage,
kds_in->usage,
CU_DEVICE_CPU,
MY_STREAM_PER_THREAD);
if (rc != CUDA_SUCCESS)
{
gpuClientELog(gclient, "failed on cuMemPrefetchAsync(CPU): %s",
cuStrError(rc));
return false;
}
#endif
continue;
}
/*
Expand All @@ -1697,7 +1712,16 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
__FATAL("failed on cuCtxPushCurrent: %s", cuStrError(rc));
/* Prefetch the inner-buffer */
rc = cuMemPrefetchAsync((CUdeviceptr)kds_in,
kds_length,
head_sz,
gcontext->cuda_device,
MY_STREAM_PER_THREAD);
if (rc != CUDA_SUCCESS)
gpuClientELog(gclient, "failed on cuMemPrefetchAsync: %s",
cuStrError(rc));
rc = cuMemPrefetchAsync((CUdeviceptr)kds_in
+ kds_in->length
- kds_in->usage,
kds_in->usage,
gcontext->cuda_device,
MY_STREAM_PER_THREAD);
if (rc != CUDA_SUCCESS)
Expand All @@ -1721,17 +1745,32 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
{
kern_data_store *kds_in = kbuf_parts->parts[i].kds_in;
gpumask_t available_gpus = kbuf_parts->parts[i].available_gpus;

size_t head_sz = (KDS_HEAD_LENGTH(kds_in) +
sizeof(uint64_t) * kds_in->hash_nslots +
sizeof(uint64_t) * kds_in->nitems);
#if 1
/* set read-only attribute */
rc = cuMemAdvise((CUdeviceptr)kds_in,
kds_length,
head_sz,
CU_MEM_ADVISE_SET_READ_MOSTLY, -1);
if (rc != CUDA_SUCCESS)
{
gpuClientELog(gclient, "failed on cuMemAdvise(SET_READ_MOSTLY): %s",
cuStrError(rc));
return false;
}
rc = cuMemAdvise((CUdeviceptr)kds_in
+ kds_in->length
- kds_in->usage,
kds_in->usage,
CU_MEM_ADVISE_SET_READ_MOSTLY, -1);
if (rc != CUDA_SUCCESS)
{
gpuClientELog(gclient, "failed on cuMemAdvise(SET_READ_MOSTLY): %s",
cuStrError(rc));
return false;
}
#endif
/* make duplications if any */
if (i < numGpuDevAttrs)
{
Expand All @@ -1753,7 +1792,16 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
__FATAL("failed on cuCtxPushCurrent: %s", cuStrError(rc));
/* Prefetch the inner-buffer */
rc = cuMemPrefetchAsync((CUdeviceptr)kds_in,
kds_length,
head_sz,
gcontext->cuda_device,
MY_STREAM_PER_THREAD);
if (rc != CUDA_SUCCESS)
gpuClientELog(gclient, "failed on cuMemPrefetchAsync: %s",
cuStrError(rc));
rc = cuMemPrefetchAsync((CUdeviceptr)kds_in
+ kds_in->length
- kds_in->usage,
kds_in->usage,
gcontext->cuda_device,
MY_STREAM_PER_THREAD);
if (rc != CUDA_SUCCESS)
Expand Down Expand Up @@ -3673,9 +3721,6 @@ gpuservHandleGpuTaskExec(gpuContext *gcontext,
/*
* Build GPU kernel execution plan, if pinned inner-buffer is
* partitioned to multiple GPUs.
*
*
*
*/
if (gq_buf && gq_buf->h_kmrels)
{
Expand All @@ -3685,11 +3730,16 @@ gpuservHandleGpuTaskExec(gpuContext *gcontext,

if (kbuf_parts)
{
int32_t repeat_id = xcmd->u.task.scan_repeat_id;
int32_t start = repeat_id * numGpuDevAttrs;
int32_t end = Min(start + numGpuDevAttrs,
kbuf_parts->hash_divisor);
part_divisor = kbuf_parts->hash_divisor;
part_gcontexts = alloca(sizeof(gpuContext *) * part_divisor);
part_reminders = alloca(sizeof(uint32_t) * part_divisor);

for (int k=0; k < part_divisor; k++)
assert(start < end);
for (int k=start; k < end; k++)
{
gpumask_t part_mask = kbuf_parts->parts[k].available_gpus;

Expand Down
31 changes: 20 additions & 11 deletions src/relscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts,
{
pgstromSharedState *ps_state = pts->ps_state;
Relation relation = pts->css.ss.ss_currentRelation;
HeapScanDesc h_scan = (HeapScanDesc)pts->css.ss.ss_currentScanDesc;
SMgrRelation smgr = RelationGetSmgr(relation);
XpuCommand *xcmd;
kern_data_store *kds;
Expand Down Expand Up @@ -589,7 +588,8 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts,
kds->nitems < kds_nrooms)
{
BlockNumber block_num
= (pts->curr_block_num + h_scan->rs_startblock) % h_scan->rs_nblocks;
= (pts->curr_block_num +
ps_state->scan_block_start) % ps_state->scan_block_nums;
/*
* MEMO: Usually, CPU is (much) more powerful than DPUs.
* In case when the source cache is already on the shared-
Expand Down Expand Up @@ -626,8 +626,8 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts,
* because it shall be JOIN'ed on different partitions.
*/
if (scan_repeat_id < 0)
scan_repeat_id = pts->curr_block_num / h_scan->rs_nblocks;
else if (scan_repeat_id != pts->curr_block_num / h_scan->rs_nblocks)
scan_repeat_id = pts->curr_block_num / ps_state->scan_block_nums;
else if (scan_repeat_id != pts->curr_block_num / ps_state->scan_block_nums)
goto out;

/*
Expand Down Expand Up @@ -712,16 +712,18 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts,
uint32_t num_blocks = kds_nrooms - kds->nitems;
uint64_t scan_block_limit = (ps_state->scan_block_nums *
pts->num_scan_repeats);
uint64_t scan_block_count;

pts->curr_block_num = pg_atomic_fetch_add_u64(&ps_state->scan_block_count,
num_blocks);
if (pts->curr_block_num >= scan_block_limit)
scan_block_count = pg_atomic_fetch_add_u64(&ps_state->scan_block_count,
num_blocks);
if (scan_block_count >= scan_block_limit)
pts->scan_done = true;
else
{
if (pts->curr_block_num + num_blocks > scan_block_limit)
num_blocks = h_scan->rs_nblocks - pts->curr_block_num;
pts->curr_block_num = (scan_block_count % ps_state->scan_block_nums);
pts->curr_block_tail = pts->curr_block_num + num_blocks;
if (pts->curr_block_tail > ps_state->scan_block_nums)
pts->curr_block_tail = ps_state->scan_block_nums;
}
}
}
Expand All @@ -735,9 +737,16 @@ pgstromRelScanChunkDirect(pgstromTaskState *pts,
Assert(scan_repeat_id >= 0);
/* XXX - debug message */
if (scan_repeat_id > 0 && scan_repeat_id != pts->last_repeat_id)
elog(NOTICE, "direct scan on '%s' moved into %dth loop for inner-buffer partitions (pid: %u)",
elog(NOTICE, "direct scan on '%s' moved into %dth loop for inner-buffer partitions (pid: %u) scan_block_count=%lu scan_block_nums=%u scan_block_start=%u num_scan_repeats=%u curr_repeat_id=%d curr_block_num=%lu curr_block_tail=%lu",
RelationGetRelationName(pts->css.ss.ss_currentRelation),
scan_repeat_id+1, MyProcPid);
scan_repeat_id+1, MyProcPid,
pg_atomic_read_u64(&ps_state->scan_block_count),
ps_state->scan_block_nums,
ps_state->scan_block_start,
pts->num_scan_repeats,
pts->curr_repeat_id,
pts->curr_block_num,
pts->curr_block_tail);
pts->last_repeat_id = scan_repeat_id;

if (strom_nblocks > 0)
Expand Down
2 changes: 0 additions & 2 deletions src/xpu_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3050,8 +3050,6 @@ typedef struct
{
int32_t inner_depth; /* partitioned depth */
int32_t hash_divisor; /* divisor for the hash-value */
int32_t remainder_head; /* smallest remainder to be processed */
int32_t remainder_tail; /* largest remainder to be processed */
struct {
gpumask_t available_gpus; /* set of GPUs for this partition */
kern_data_store *kds_in; /* used by GPU-service */
Expand Down

0 comments on commit 3397377

Please sign in to comment.