Skip to content

Commit

Permalink
bugfix: a bunch of rows were skipped after GPU kernel suspend/resume
Browse files Browse the repository at this point in the history
issue #819

When GPU kernel raised CPU-fallback first time, it once suspends
the GPU kernel because the fallback buffer is not attached on.
Then, it restarts GPU kernel, and it fetched rows according to
the wp->smx_row_count.
In the old implementation, we incremented the wp->smx_row_count
immediately on the early half of __gpuscan_load_source_xxxx().
It means, if CPU-fallback raised GPU-kernel suspend/resume,
we will see the next block even though the suspended block was
not processed.
  • Loading branch information
kaigai committed Oct 17, 2024
1 parent ff92fbb commit 54176d2
Showing 1 changed file with 33 additions and 22 deletions.
55 changes: 33 additions & 22 deletions src/cuda_gpuscan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,7 @@ __gpuscan_load_source_row(kern_context *kcxt,
kern_tupitem *tupitem = NULL;

/* compute the next row-index */
count = wp->smx_row_count;
__syncthreads();
if (get_local_id() == 0)
wp->smx_row_count++;
index = get_global_size() * count + get_global_base();
index = get_global_size() * wp->smx_row_count + get_global_base();
if (index >= kds_src->nitems)
{
if (get_local_id() == 0)
Expand Down Expand Up @@ -242,8 +238,6 @@ __gpuscan_load_source_row(kern_context *kcxt,
*/
wr_pos = WARP_WRITE_POS(wp,0);
wr_pos += pgstrom_stair_sum_binary(tupitem != NULL, &count);
if (get_local_id() == 0)
WARP_WRITE_POS(wp,0) += count;
if (tupitem != NULL)
{
if (!ExecMoveKernelVariables(kcxt,
Expand All @@ -257,6 +251,23 @@ __gpuscan_load_source_row(kern_context *kcxt,
/* error checks */
if (__syncthreads_count(kcxt->errcode != ERRCODE_STROM_SUCCESS) > 0)
return -1;
/*
* NOTE: 'smx_row_count' and 'WARP_WRITE_POS' must be updated after
* the GPU kernel suspend/resume possibility has gone.
* Once GPU kernel gets suspended, it restarts this depth again, and
* its read and write position should be immutable with the first try.
*
* When any of tuple needs CPU fallbacks but a fallback buffer is not
* allocated yet, we stop the GPU kernel once, then restart this block
* again. In this case, we have to fetch tuples from the same position
* (calculated based on smx_row_count), and have to write to the same
* WARP_WRITE_POS.
*/
if (get_local_id() == 0)
{
wp->smx_row_count++;
WARP_WRITE_POS(wp,0) += count;
}
/* move to the next depth, if more than blockSize tuples were fetched. */
return (WARP_WRITE_POS(wp,0) >= WARP_READ_POS(wp,0) + get_local_size() ? 1 : 0);
}
Expand Down Expand Up @@ -413,11 +424,7 @@ __gpuscan_load_source_arrow(kern_context *kcxt,
bool is_valid = false;

/* compute the next row-index */
count = wp->smx_row_count;
__syncthreads();
if (get_local_id() == 0)
wp->smx_row_count++;
index = get_global_size() * count + get_global_base();
index = get_global_size() * wp->smx_row_count + get_global_base();
if (index >= kds_src->nitems)
{
if (get_local_id() == 0)
Expand Down Expand Up @@ -446,8 +453,6 @@ __gpuscan_load_source_arrow(kern_context *kcxt,
*/
wr_pos = WARP_WRITE_POS(wp,0);
wr_pos += pgstrom_stair_sum_binary(is_valid, &count);
if (get_local_id() == 0)
WARP_WRITE_POS(wp,0) += count;
if (is_valid)
{
if (!ExecMoveKernelVariables(kcxt,
Expand All @@ -461,6 +466,12 @@ __gpuscan_load_source_arrow(kern_context *kcxt,
/* error checks */
if (__syncthreads_count(kcxt->errcode != ERRCODE_STROM_SUCCESS) > 0)
return -1;
/* make forward read/write pointer */
if (get_local_id() == 0)
{
wp->smx_row_count++;
WARP_WRITE_POS(wp,0) += count;
}
/* move to the next depth, if more than blockSize rows were fetched. */
return (WARP_WRITE_POS(wp,0) >= WARP_READ_POS(wp,0) + get_local_size() ? 1 : 0);
}
Expand Down Expand Up @@ -535,11 +546,7 @@ __gpuscan_load_source_column(kern_context *kcxt,
bool is_valid = false;

/* fetch next blockSize tuples */
count = wp->smx_row_count;
__syncthreads();
if (get_local_id() == 0)
wp->smx_row_count++;
index = get_global_size() * count + get_global_base();
index = get_global_size() * wp->smx_row_count + get_global_base();
if (index >= kds_src->nitems)
{
if (get_local_id() == 0)
Expand Down Expand Up @@ -570,8 +577,6 @@ __gpuscan_load_source_column(kern_context *kcxt,
*/
wr_pos = WARP_WRITE_POS(wp,0);
wr_pos += pgstrom_stair_sum_binary(is_valid, &count);
if (get_local_id() == 0)
WARP_WRITE_POS(wp,0) += count;
if (is_valid)
{
if (!ExecMoveKernelVariables(kcxt,
Expand All @@ -584,7 +589,13 @@ __gpuscan_load_source_column(kern_context *kcxt,
}
/* error checks */
if (__syncthreads_count(kcxt->errcode != ERRCODE_STROM_SUCCESS) > 0)
return -1;
return -1;
/* make forward read/write pointer */
if (get_local_id() == 0)
{
wp->smx_row_count++;
WARP_WRITE_POS(wp,0) += count;
}
/* move to the next depth if more than 32 htuples were fetched */
return (WARP_WRITE_POS(wp,0) >= WARP_READ_POS(wp,0) + get_local_size() ? 1 : 0);
}
Expand Down

0 comments on commit 54176d2

Please sign in to comment.