Skip to content

Commit

Permalink
early m_kds_src release on inner-buffer reconstruction
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Sep 24, 2024
1 parent baf77e1 commit 697ef24
Showing 1 changed file with 95 additions and 124 deletions.
219 changes: 95 additions & 124 deletions src/gpu_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -1334,14 +1334,6 @@ gpuClientELogByExtraModule(gpuClient *gclient)
*
* ----------------------------------------------------------------
*/
struct gpuMemoryTracker
{
volatile int refcnt;
CUdeviceptr m_devptr;
size_t m_length;
};
typedef struct gpuMemoryTracker gpuMemoryTracker;

struct gpuQueryBuffer
{
dlist_node chain;
Expand All @@ -1356,10 +1348,7 @@ struct gpuQueryBuffer
pthread_rwlock_t m_kds_rwlock; /* RWLock for the final/fallback buffer */
int gpumem_nitems;
int gpumem_nrooms;
gpuMemoryTracker **gpumem_trackers;

int nr_subbuffers;
struct gpuQueryBuffer **subbuffers;
CUdeviceptr *gpumem_devptrs;
struct {
CUdeviceptr m_kds_final; /* final buffer (device) */
size_t m_kds_final_length; /* length of final buffer */
Expand All @@ -1385,44 +1374,39 @@ static pthread_cond_t gpu_query_buffer_cond = PTHREAD_COND_INITIALIZER;

static gpuQueryBuffer *__getGpuQueryBuffer(uint64_t buffer_id, bool may_create);

static bool
__enlargeGpuQueryBuffer(gpuQueryBuffer *gq_buf)
{
if (gq_buf->gpumem_nitems >= gq_buf->gpumem_nrooms)
{
CUdeviceptr *__devptrs;
int __nrooms = (2 * gq_buf->gpumem_nrooms + 20);

__devptrs = realloc(gq_buf->gpumem_devptrs,
sizeof(CUdeviceptr) * __nrooms);
if (!__devptrs)
return false;
gq_buf->gpumem_devptrs = __devptrs;
gq_buf->gpumem_nrooms = __nrooms;
}
return true;
}

static CUresult
allocGpuQueryBuffer(gpuQueryBuffer *gq_buf,
CUdeviceptr *p_devptr, size_t bytesize)
{
gpuMemoryTracker *track;
CUdeviceptr m_devptr;
CUresult rc;

/* enlarge the room */
if (gq_buf->gpumem_nitems >= gq_buf->gpumem_nrooms)
{
gpuMemoryTracker **__trackers;
int __nrooms = (2 * gq_buf->gpumem_nrooms + 20);

__trackers = realloc(gq_buf->gpumem_trackers,
sizeof(gpuMemoryTracker) * __nrooms);
if (!__trackers)
return CUDA_ERROR_OUT_OF_MEMORY;
gq_buf->gpumem_trackers = __trackers;
gq_buf->gpumem_nrooms = __nrooms;
}
/* setup tracker object */
track = calloc(1, sizeof(gpuMemoryTracker));
if (!track)
if (!__enlargeGpuQueryBuffer(gq_buf))
return CUDA_ERROR_OUT_OF_MEMORY;
/* allocation */
rc = cuMemAllocManaged(&m_devptr, bytesize,
CU_MEM_ATTACH_GLOBAL);
if (rc != CUDA_SUCCESS)
{
free(track);
return rc;
}
track->refcnt = 1;
track->m_devptr = m_devptr;
track->m_length = bytesize;
gq_buf->gpumem_trackers[gq_buf->gpumem_nitems++] = track;

gq_buf->gpumem_devptrs[gq_buf->gpumem_nitems++] = m_devptr;
*p_devptr = m_devptr;

__gsLog("Query buffer allocation at %p (sz=%s)",
Expand All @@ -1431,61 +1415,46 @@ allocGpuQueryBuffer(gpuQueryBuffer *gq_buf,
return CUDA_SUCCESS;
}

static CUresult
obtainGpuQueryBuffer(gpuQueryBuffer *gq_dst,
gpuQueryBuffer *gq_src,
CUdeviceptr m_devptr, size_t *p_bytesize)
static bool
releaseGpuQueryBufferOne(gpuQueryBuffer *gq_buf,
CUdeviceptr m_devptr)
{
/* enlarge the destination room */
if (gq_dst->gpumem_nitems >= gq_dst->gpumem_nrooms)
{
gpuMemoryTracker **__trackers;
int __nrooms = (2 * gq_dst->gpumem_nrooms + 20);

__trackers = realloc(gq_dst->gpumem_trackers,
sizeof(gpuMemoryTracker *) * __nrooms);
if (!__trackers)
return CUDA_ERROR_OUT_OF_MEMORY;
gq_dst->gpumem_trackers = __trackers;
gq_dst->gpumem_nrooms = __nrooms;
}
/* lookup the source buffer */
for (int i=0; i < gq_src->gpumem_nitems; i++)
for (int i=0; i < gq_buf->gpumem_nitems; i++)
{
gpuMemoryTracker *track = gq_src->gpumem_trackers[i];
CUresult rc;

if (track->m_devptr == m_devptr)
if (gq_buf->gpumem_devptrs[i] == m_devptr)
{
track->refcnt++;
if (p_bytesize)
*p_bytesize = track->m_length;
gq_dst->gpumem_trackers[gq_dst->gpumem_nitems++] = track;
return CUDA_SUCCESS;
gq_buf->gpumem_devptrs[i] = 0;
rc = cuMemFree(m_devptr);
if (rc != CUDA_SUCCESS)
__FATAL("failed on cuMemFree(%p): %s",
(void *)m_devptr, cuStrError(rc));
__gsLog("Query buffer release one at %p",
(void *)m_devptr);
return true;
}
}
return CUDA_ERROR_NOT_FOUND;
return false;
}


static void
releaseGpuQueryBufferAll(gpuQueryBuffer *gq_buf)
{
for (int i=0; i < gq_buf->gpumem_nitems; i++)
{
gpuMemoryTracker *track = gq_buf->gpumem_trackers[i];
CUdeviceptr m_devptr = gq_buf->gpumem_devptrs[i];
CUresult rc;

assert(track->refcnt > 0);
if (--track->refcnt == 0)
if (m_devptr != 0UL)
{
__gsLog("Query buffer release at %p (sz=%s)",
(void *)track->m_devptr, format_bytesz(track->m_length));

rc = cuMemFree(track->m_devptr);
rc = cuMemFree(m_devptr);
if (rc != CUDA_SUCCESS)
__FATAL("failed on cuMemFree(%p): %s",
(void *)track->m_devptr,
cuStrError(rc));
free(track);
(void *)m_devptr, cuStrError(rc));
__gsLog("Query buffer release all at %p",
(void *)m_devptr);
}
}
gq_buf->gpumem_nitems = 0;
Expand All @@ -1498,12 +1467,6 @@ __putGpuQueryBufferNoLock(gpuQueryBuffer *gq_buf)
if (--gq_buf->refcnt == 0)
{
releaseGpuQueryBufferAll(gq_buf);
if (gq_buf->subbuffers)
{
for (int i=0; i < gq_buf->nr_subbuffers; i++)
__putGpuQueryBufferNoLock(gq_buf->subbuffers[i]);
free(gq_buf->subbuffers);
}
if (gq_buf->h_kmrels)
{
if (munmap(gq_buf->h_kmrels,
Expand Down Expand Up @@ -1685,6 +1648,14 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
rc = cuCtxPopCurrent(NULL);
if (rc != CUDA_SUCCESS)
__FATAL("failed on cuCtxPopCurrent: %s", cuStrError(rc));

if (!releaseGpuQueryBufferOne(gq_src, m_kds_src))
{
gpuClientELog(gclient, "unable to release source of pinned-inner buffer");
goto error;
}
gq_src->gpus[i].m_kds_final = 0UL;
gq_src->gpus[i].m_kds_final_length = 0UL;
}
/* setup memory attribute for each partition */
gettimeofday(&tv2, NULL);
Expand Down Expand Up @@ -1752,8 +1723,6 @@ __setupGpuJoinPinnedInnerBufferPartitioned(gpuClient *gclient,
gpumask_t available_gpus = kbuf_parts->parts[i].available_gpus;

/* set read-only attribute */
fprintf(stderr, "kds_in = %p len=%zu\n", kds_in, kds_length);

rc = cuMemAdvise((CUdeviceptr)kds_in,
kds_length,
CU_MEM_ADVISE_SET_READ_MOSTLY, -1);
Expand Down Expand Up @@ -1860,24 +1829,24 @@ __setupGpuJoinPinnedInnerBufferReconstruct(gpuClient *gclient,
*/
for (int i=0; i < numGpuDevAttrs; i++)
{
gpuContext *__gcontext = &gpuserv_gpucontext_array[i];
CUdeviceptr __m_kds_final = gq_src->gpus[i].m_kds_final;
gpuContext *gcontext = &gpuserv_gpucontext_array[i];
CUdeviceptr m_kds_src = gq_src->gpus[i].m_kds_final;
int grid_sz;
int block_sz;
void *kern_args[4];

if (__m_kds_final == 0UL)
if (m_kds_src == 0UL)
continue;

/* Switch CUDA context to the source */
rc = cuCtxPushCurrent(__gcontext->cuda_context);
rc = cuCtxPushCurrent(gcontext->cuda_context);
if (rc != CUDA_SUCCESS)
__FATAL("failed on cuCtxPushCurrent: %s", cuStrError(rc));

/* launch buffer reconstruction kernel function */
rc = gpuOptimalBlockSize(&grid_sz,
&block_sz,
__gcontext->cufn_kbuf_reconstruction, 0);
gcontext->cufn_kbuf_reconstruction, 0);
if (rc != CUDA_SUCCESS)
{
gpuClientELog(gclient, "failed on gpuOptimalBlockSize: %s",
Expand All @@ -1886,8 +1855,8 @@ __setupGpuJoinPinnedInnerBufferReconstruct(gpuClient *gclient,
}
/* runs-reconstruction kernel */
kern_args[0] = &m_kds_in;
kern_args[1] = &__m_kds_final;
rc = cuLaunchKernel(__gcontext->cufn_kbuf_reconstruction,
kern_args[1] = &m_kds_src;
rc = cuLaunchKernel(gcontext->cufn_kbuf_reconstruction,
grid_sz, 1, 1,
block_sz, 1, 1,
0,
Expand All @@ -1913,6 +1882,14 @@ __setupGpuJoinPinnedInnerBufferReconstruct(gpuClient *gclient,
if (rc != CUDA_SUCCESS)
__FATAL("failed on cuCtxPopCurrent: %s", cuStrError(rc));

/* release m_kds_src */
if (!releaseGpuQueryBufferOne(gq_src, m_kds_src))
{
gpuClientELog(gclient, "unable to release source of pinned-inner buffer");
return false;
}
gq_src->gpus[i].m_kds_final = 0UL;
gq_src->gpus[i].m_kds_final_length = 0UL;
gpu_count++;
}
/* assign read-only attribute */
Expand Down Expand Up @@ -1946,36 +1923,32 @@ __setupGpuJoinPinnedInnerBufferZeroCopy(gpuClient *gclient,
kern_multirels *m_kmrels,
int depth)
{
CUresult rc;
size_t allocated;
size_t consumed;

/*
* Single input buffer with no partitioning can be reused as-is.
*/
rc = obtainGpuQueryBuffer(gq_buf,
gq_src,
(CUdeviceptr)kds_src,
&allocated);
if (rc != CUDA_SUCCESS)
if (!__enlargeGpuQueryBuffer(gq_buf))
return CUDA_ERROR_OUT_OF_MEMORY;
/* move the ownership of kds_src to gq_buf */
for (int i=0; i < gq_src->gpumem_nitems; i++)
{
gpuClientELog(gclient, "unable to obtain GPU Query Buffer: %s",
cuStrError(rc));
return false;
if (gq_src->gpumem_devptrs[i] == (CUdeviceptr)kds_src)
{
size_t consumed;

gq_src->gpumem_devptrs[i] = 0UL;
gq_buf->gpumem_devptrs[gq_buf->gpumem_nitems++] = (CUdeviceptr)kds_src;
m_kmrels->chunks[depth-1].kds_in = kds_src;
consumed = (KDS_HEAD_LENGTH(kds_src) +
sizeof(uint64_t) * kds_src->nitems +
sizeof(uint64_t) * kds_src->hash_nslots +
kds_src->usage);
__gsLog("pinned inner buffer zero-copy (total: nitems=%u, usage=%s, consumption=%s of %s allocated)",
kds_src->nitems,
format_bytesz(kds_src->usage),
format_bytesz(consumed),
format_bytesz(kds_src->length));
return true;
}
}
assert(kds_src->length <= allocated);
m_kmrels->chunks[depth-1].kds_in = kds_src;
consumed = (KDS_HEAD_LENGTH(kds_src) +
sizeof(uint64_t) * kds_src->nitems +
sizeof(uint64_t) * kds_src->hash_nslots +
kds_src->usage);

__gsLog("pinned inner buffer zero-copy (total: nitems=%u, usage=%s, consumption=%s of %s allocated)",
kds_src->nitems,
format_bytesz(kds_src->usage),
format_bytesz(consumed),
format_bytesz(allocated));
return true;
gpuClientELog(gclient, "unable to obtain GPU Query Buffer");
return false;
}

static bool
Expand Down Expand Up @@ -2003,6 +1976,7 @@ __setupGpuJoinPinnedInnerBufferCommon(gpuClient *gclient,
gpuClientELog(gclient, "pinned inner buffer[%d] was not found", depth);
return false;
}
pthreadRWLockWriteLock(&gq_src->m_kds_rwlock);

/*
* Setup kds_head of the new inner-buffer, if necessary
Expand All @@ -2024,6 +1998,7 @@ __setupGpuJoinPinnedInnerBufferCommon(gpuClient *gclient,
if (!kds_src)
{
gpuClientELog(gclient, "No pinned inner buffer[%d] was found", depth);
pthreadRWLockUnlock(&gq_src->m_kds_rwlock);
return false;
}
kds_head = alloca(KDS_HEAD_LENGTH(kds_src));
Expand Down Expand Up @@ -2073,6 +2048,7 @@ __setupGpuJoinPinnedInnerBufferCommon(gpuClient *gclient,
}
retval = true;
error:
pthreadRWLockUnlock(&gq_src->m_kds_rwlock);
putGpuQueryBuffer(gq_src);
return retval;
}
Expand Down Expand Up @@ -2187,9 +2163,6 @@ __setupGpuQueryJoinInnerBuffer(gpuClient *gclient,

error:
releaseGpuQueryBufferAll(gq_buf);
for (int i=0; i < gq_buf->nr_subbuffers; i++)
putGpuQueryBuffer(gq_buf->subbuffers[i]);
gq_buf->nr_subbuffers = 0;
if (h_kmrels != NULL &&
h_kmrels != MAP_FAILED)
munmap(h_kmrels, mmap_sz);
Expand Down Expand Up @@ -2305,11 +2278,9 @@ __expandGpuQueryGroupByBuffer(gpuClient *gclient,
gettimeofday(&tv2, NULL);
for (int i=0; i < gq_buf->gpumem_nitems; i++)
{
gpuMemoryTracker *track = gq_buf->gpumem_trackers[i];

if (track->m_devptr == (CUdeviceptr)kds_old)
if (gq_buf->gpumem_devptrs[i] == (CUdeviceptr)kds_old)
{
track->m_devptr = (CUdeviceptr)kds_new;
gq_buf->gpumem_devptrs[i] = (CUdeviceptr)kds_new;
goto found;
}
}
Expand Down

0 comments on commit 697ef24

Please sign in to comment.