Skip to content

Commit

Permalink
add delay to apply pg_strom.max_async_tasks
Browse files Browse the repository at this point in the history
issue #811
  • Loading branch information
kaigai committed Oct 19, 2024
1 parent 1807801 commit 51c0b0d
Showing 1 changed file with 57 additions and 42 deletions.
99 changes: 57 additions & 42 deletions src/gpu_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,20 @@ typedef struct
/*
* GPU service shared GPU variable
*/
#define __SIGWAKEUP SIGUSR2

typedef struct
{
volatile pid_t gpuserv_pid;
volatile bool gpuserv_ready_accept;
pg_atomic_uint32 max_async_tasks_updated;
pg_atomic_uint32 max_async_tasks;
/*
* For worker startup delay (issue #811), max_async_tasks records
* the timestamp when configuration was updated.
* Lower 10bit : number of worker threads per GPU device.
* Upper 53bit : timestamp in msec precision from the epoch.
* (If timestamp==0, it didn't updated yet)
*/
#define MAX_ASYNC_TASKS_DELAY 4000 /* 4.0sec */
#define MAX_ASYNC_TASKS_BITS 10
#define MAX_ASYNC_TASKS_MASK ((1UL<<MAX_ASYNC_TASKS_BITS)-1)
pg_atomic_uint64 max_async_tasks;
pg_atomic_uint32 gpuserv_debug_output;
} gpuServSharedState;

Expand Down Expand Up @@ -211,25 +217,38 @@ gpuserv_debug_output_show(void)
static void
pgstrom_max_async_tasks_assign(int newval, void *extra)
{
if (gpuserv_shared_state)
if (!gpuserv_shared_state)
__pgstrom_max_async_tasks_dummy = newval;
else
{
pid_t gpuserv_pid = gpuserv_shared_state->gpuserv_pid;

pg_atomic_write_u32(&gpuserv_shared_state->max_async_tasks, newval);
pg_atomic_write_u32(&gpuserv_shared_state->max_async_tasks_updated, 1);
if (gpuserv_pid != 0)
kill(gpuserv_pid, SIGUSR2);
struct timeval ts;
uint64_t conf_val;
uint64_t curr_val;

conf_val = pg_atomic_read_u64(&gpuserv_shared_state->max_async_tasks);
do {
if ((conf_val & MAX_ASYNC_TASKS_MASK) == newval)
break; /* nothing to do */
gettimeofday(&ts, NULL);
curr_val = ((ts.tv_sec * 1000L +
ts.tv_usec / 1000L) << MAX_ASYNC_TASKS_BITS) | (uint64_t)newval;
} while (!pg_atomic_compare_exchange_u64(&gpuserv_shared_state->max_async_tasks,
&conf_val,
curr_val));
}
else
__pgstrom_max_async_tasks_dummy = newval;
}

int
pgstrom_max_async_tasks(void)
{
return (gpuserv_shared_state
? pg_atomic_read_u32(&gpuserv_shared_state->max_async_tasks)
: __pgstrom_max_async_tasks_dummy);
uint64_t curr_val;

if (gpuserv_shared_state)
{
curr_val = pg_atomic_read_u64(&gpuserv_shared_state->max_async_tasks);
return (curr_val & MAX_ASYNC_TASKS_MASK);
}
return __pgstrom_max_async_tasks_dummy;
}

static const char *
Expand Down Expand Up @@ -4607,20 +4626,30 @@ __gpuContextAdjustWorkersOne(gpuContext *gcontext, uint32_t nworkers)
static void
__gpuContextAdjustWorkers(void)
{
uint32_t updated;
uint64_t conf_val;

updated = pg_atomic_exchange_u32(&gpuserv_shared_state->max_async_tasks_updated, 0);
if (updated)
conf_val = pg_atomic_read_u64(&gpuserv_shared_state->max_async_tasks);
if ((conf_val & ~MAX_ASYNC_TASKS_MASK) != 0)
{
uint32_t nworkers;
dlist_iter iter;
uint64_t conf_ts = (conf_val >> MAX_ASYNC_TASKS_BITS);
uint64_t curr_ts;
struct timeval ts;

nworkers = pg_atomic_read_u32(&gpuserv_shared_state->max_async_tasks);
dlist_foreach(iter, &gpuserv_gpucontext_list)
gettimeofday(&ts, NULL);
curr_ts = (ts.tv_sec * 1000L + ts.tv_usec / 1000L);
if (curr_ts >= conf_ts + MAX_ASYNC_TASKS_DELAY)
{
gpuContext *gcontext = dlist_container(gpuContext, chain, iter.cur);
uint32_t nworkers = (conf_val & MAX_ASYNC_TASKS_MASK);
dlist_iter iter;

__gpuContextAdjustWorkersOne(gcontext, nworkers);
dlist_foreach(iter, &gpuserv_gpucontext_list)
{
gpuContext *gcontext = dlist_container(gpuContext,
chain, iter.cur);
__gpuContextAdjustWorkersOne(gcontext, nworkers);
}
pg_atomic_compare_exchange_u64(&gpuserv_shared_state->max_async_tasks,
&conf_val, (uint64_t)nworkers);
}
}
}
Expand Down Expand Up @@ -4679,9 +4708,6 @@ gpuservSetupGpuContext(int cuda_dindex)
else
gcontext->cuda_profiler_started = true;
}
/* launch worker threads */
pg_atomic_write_u32(&gpuserv_shared_state->max_async_tasks_updated, 1);

return gcontext;
}

Expand Down Expand Up @@ -4740,12 +4766,6 @@ gpuservBgWorkerSignal(SIGNAL_ARGS)
errno = saved_errno;
}

static void
gpuservBgWorkerWakeUp(SIGNAL_ARGS)
{
/* nothing to do */
}

/*
* gpuservClenupListenSocket
*/
Expand Down Expand Up @@ -4775,10 +4795,8 @@ gpuservBgWorkerMain(Datum arg)
CUresult rc;
int dindex;

gpuserv_shared_state->gpuserv_pid = getpid();
pqsignal(SIGTERM, gpuservBgWorkerSignal); /* terminate GpuServ */
pqsignal(SIGHUP, gpuservBgWorkerSignal); /* restart GpuServ */
pqsignal(SIGUSR2, gpuservBgWorkerWakeUp); /* interrupt epoll_wait(2) */
BackgroundWorkerUnblockSignals();

/* Registration of resource cleanup handler */
Expand Down Expand Up @@ -4855,7 +4873,6 @@ gpuservBgWorkerMain(Datum arg)
}
PG_CATCH();
{
gpuserv_shared_state->gpuserv_pid = 0;
gpuserv_shared_state->gpuserv_ready_accept = false;
gpuservCloseServerSocket();
while (!dlist_is_empty(&gpuserv_gpucontext_list))
Expand All @@ -4871,7 +4888,6 @@ gpuservBgWorkerMain(Datum arg)
PG_END_TRY();

/* cleanup */
gpuserv_shared_state->gpuserv_pid = 0;
gpuserv_shared_state->gpuserv_ready_accept = false;
gpuservCloseServerSocket();
while (!dlist_is_empty(&gpuserv_gpucontext_list))
Expand Down Expand Up @@ -4916,9 +4932,8 @@ pgstrom_startup_executor(void)
MAXALIGN(sizeof(gpuServSharedState)),
&found);
memset(gpuserv_shared_state, 0, sizeof(gpuServSharedState));
pg_atomic_init_u32(&gpuserv_shared_state->max_async_tasks_updated, 1);
pg_atomic_init_u32(&gpuserv_shared_state->max_async_tasks,
__pgstrom_max_async_tasks_dummy);
pg_atomic_init_u64(&gpuserv_shared_state->max_async_tasks,
__pgstrom_max_async_tasks_dummy | (1UL<<MAX_ASYNC_TASKS_BITS));
pg_atomic_init_u32(&gpuserv_shared_state->gpuserv_debug_output,
__gpuserv_debug_output_dummy);
}
Expand Down

0 comments on commit 51c0b0d

Please sign in to comment.