Skip to content

Commit

Permalink
update the logic for inner-buffer partitioning
Browse files Browse the repository at this point in the history
Now we build the partitioned inner-buffer at once, to release
final buffers of GpuScan as early as possible we can.
  • Loading branch information
kaigai committed Sep 24, 2024
1 parent 18d5904 commit 903dc6f
Show file tree
Hide file tree
Showing 4 changed files with 531 additions and 400 deletions.
106 changes: 94 additions & 12 deletions src/cuda_gpujoin.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
#include "cuda_common.h"

/*
* GPU Buffer Partitioning
* GPU Buffer Reconstruction
*/
KERNEL_FUNCTION(void)
kern_buffer_partitioning(kern_data_store *kds_dst,
const kern_data_store *__restrict__ kds_src,
uint32_t hash_divisor,
uint32_t hash_remainder)
kern_buffer_reconstruction(kern_data_store *kds_dst,
const kern_data_store *__restrict__ kds_src)
{
uint64_t *rowindex = KDS_GET_ROWINDEX(kds_src);
uint32_t base;
Expand All @@ -43,20 +41,16 @@ kern_buffer_partitioning(kern_data_store *kds_dst,
hitem = (kern_hashitem *)((char *)kds_src + kds_src->length
- rowindex[index]
- offsetof(kern_hashitem, t));
if (hash_divisor == 0 ||
(hitem->hash % hash_divisor) == hash_remainder)
tupsz = MAXALIGN(offsetof(kern_hashitem,
t.htup) + hitem->t.t_len);
else
hitem = NULL;
tupsz = MAXALIGN(offsetof(kern_hashitem,
t.htup) + hitem->t.t_len);
}
/* allocation of the destination buffer */
row_id = pgstrom_stair_sum_binary(tupsz > 0, &count);
offset = pgstrom_stair_sum_uint64(tupsz, &total_sz);
if (get_local_id() == 0)
{
base_rowid = __atomic_add_uint32(&kds_dst->nitems, count);
base_usage = __atomic_add_uint64(&kds_dst->usage, total_sz);
base_usage = __atomic_add_uint64(&kds_dst->usage, total_sz);
}
__syncthreads();
/* put tuples on the destination */
Expand All @@ -83,6 +77,94 @@ kern_buffer_partitioning(kern_data_store *kds_dst,
}
}

/*
* GPU Buffer Partitioning
*/
KERNEL_FUNCTION(void)
kern_buffer_partitioning(kern_buffer_partitions *kbuf_parts,
const kern_data_store *__restrict__ kds_src)
{
uint64_t *rowindex = KDS_GET_ROWINDEX(kds_src);
uint32_t hash_divisor = kbuf_parts->hash_divisor;
uint64_t *kpart_usage = (uint64_t *)SHARED_WORKMEM(0);
uint32_t *kpart_nitems = (uint32_t *)(kpart_usage + hash_divisor);
uint32_t base;

assert(hash_divisor >= 2);
for (base = get_global_base();
base < kds_src->nitems;
base += get_global_size())
{
const kern_hashitem *hitem = NULL;
kern_data_store *kds_in = NULL;
uint32_t index = base + get_local_id();
uint32_t tupsz = 0;
int part_id;
uint32_t row_id;
uint64_t offset;

/* reset buffer */
for (int p=get_local_id(); p < hash_divisor; p += get_local_size())
{
kpart_usage[p] = 0;
kpart_nitems[p] = 0;
}
__syncthreads();

/* fetch row from kds_src */
if (index < kds_src->nitems)
{
hitem = (kern_hashitem *)((char *)kds_src + kds_src->length
- rowindex[index]
- offsetof(kern_hashitem, t));
tupsz = MAXALIGN(offsetof(kern_hashitem,
t.htup) + hitem->t.t_len);
part_id = hitem->hash % hash_divisor;
row_id = __atomic_add_uint32(&kpart_nitems[part_id], 1);
offset = __atomic_add_uint64(&kpart_usage[part_id], tupsz);
}
__syncthreads();

/* allocation of the destination buffer */
if (get_local_id() < hash_divisor)
{
int p = get_local_id();

kds_in = kbuf_parts->parts[p].kds_in;
kpart_nitems[p] = __atomic_add_uint32(&kds_in->nitems, kpart_nitems[p]);
kpart_usage[p] = __atomic_add_uint64(&kds_in->usage, kpart_usage[p]);
}
__syncthreads();

/* write out to the position */
if (hitem)
{
kern_hashitem *__hitem;
uint64_t *__hslots;

assert(part_id >= 0 && part_id < hash_divisor);
kds_in = kbuf_parts->parts[part_id].kds_in;
row_id += kpart_nitems[part_id];
offset += kpart_usage[part_id] + tupsz;

__hslots = KDS_GET_HASHSLOT(kds_in, hitem->hash);
__hitem = (kern_hashitem *)
((char *)kds_in + kds_in->length - offset);
__hitem->hash = hitem->hash;
__hitem->next = __atomic_exchange_uint64(__hslots, offset);
__hitem->t.rowid = row_id;
__hitem->t.t_len = hitem->t.t_len;
memcpy(&__hitem->t.htup, &hitem->t.htup, hitem->t.t_len);
assert(offsetof(kern_hashitem, t.htup) + hitem->t.t_len <= tupsz);
__threadfence();
KDS_GET_ROWINDEX(kds_in)[row_id] = ((char *)kds_in
+ kds_in->length
- (char *)&__hitem->t);
}
__syncthreads();
}
}

/*
* GPU Nested-Loop
*/
Expand Down
2 changes: 1 addition & 1 deletion src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2311,7 +2311,7 @@ pgstromExplainTaskState(CustomScanState *node,
uint64_t final_nitems = pg_atomic_read_u64(&ps_state->final_nitems);
uint64_t final_usage = pg_atomic_read_u64(&ps_state->final_usage);
uint64_t final_total = pg_atomic_read_u64(&ps_state->final_total);

appendStringInfo(&buf, "nitems: %lu, usage: %s, total: %s",
final_nitems,
format_bytesz(final_usage),
Expand Down
2 changes: 1 addition & 1 deletion src/gpu_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ innerPreloadSetupPinnedInnerBufferPartitions(kern_multirels *h_kmrels,
parts[divisor]));
//TODO: Phase-3 allows divisor > numGPUs restructions
if (divisor > numGpuDevAttrs)
elog(ERROR, "pinned inner-buffer partitions divisor %d larger than number of GPU devices (%d) is not supported right now", divisor, numGpuDevAttrs);
elog(ERROR, "pinned inner-buffer partitions divisor %d larger than number of GPU devices (%d) is not supported right now\n largest_sz=%zu largest_depth=%d", divisor, numGpuDevAttrs, largest_sz, largest_depth);
if (h_kmrels)
{
kern_buffer_partitions *kbuf_parts = (kern_buffer_partitions *)
Expand Down
Loading

0 comments on commit 903dc6f

Please sign in to comment.