Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
colesbury committed Nov 20, 2023
1 parent be0bd54 commit 8d11726
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 6 deletions.
1 change: 1 addition & 0 deletions Include/internal/pycore_ceval.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ void _PyEval_FrameClearAndPop(PyThreadState *tstate, _PyInterpreterFrame *frame)
#define _PY_CALLS_TO_DO_BIT 2
#define _PY_ASYNC_EXCEPTION_BIT 3
#define _PY_GC_SCHEDULED_BIT 4
#define _PY_EVAL_PLEASE_STOP_BIT 5

/* Reserve a few bits for future use */
#define _PY_EVAL_EVENTS_BITS 8
Expand Down
14 changes: 14 additions & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ struct _Py_long_state {
int max_str_digits;
};

// Support for stop-the-world events. This exists in both the PyRuntime struct
// for global pauses and in each PyInterpreterState for per-interpreter pauses.
struct _stoptheworld_state {
PyMutex mutex; // Serializes stop-the-world attempts.
uint8_t requested; // Set to 1 when a pause is requested.
uint8_t world_stopped; // Set to 1 when the world is stopped.
uint8_t is_global; // Set to 1 when contained in PyRuntime struct.

PyEvent stop_event; // Set when thread_countdown reaches zero.
Py_ssize_t thread_countdown; // Number of threads that must pause.

PyThreadState *requester; // Thread that requested the pause (may be NULL).
};

/* cross-interpreter data registry */

Expand Down Expand Up @@ -164,6 +177,7 @@ struct _is {

struct _warnings_runtime_state warnings;
struct atexit_state atexit;
struct _stoptheworld_state stoptheworld;

struct _obmalloc_state obmalloc;

Expand Down
3 changes: 2 additions & 1 deletion Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
// Wait for the event to be set, or until the timeout expires. If the event is
// already set, then this returns immediately. Returns 1 if the event was set,
// and 0 if the timeout expired or thread was interrupted.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns);
// Valid values for `flags` are _Py_LOCK_DONT_DETACH and _PY_LOCK_DETACH.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns, _PyLockFlags flags);


// _PyRawMutex implements a word-sized mutex that that does not depend on the
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ typedef struct pyruntimestate {
struct _faulthandler_runtime_state faulthandler;
struct _tracemalloc_runtime_state tracemalloc;

struct _stoptheworld_state stoptheworld;

PyPreConfig preconfig;

// Audit values must be preserved when Py_Initialize()/Py_Finalize()
Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ extern PyTypeObject _PyExc_MemoryError;
}, \
.faulthandler = _faulthandler_runtime_state_INIT, \
.tracemalloc = _tracemalloc_runtime_state_INIT, \
.stoptheworld = { \
.is_global = 1, \
}, \
.float_state = { \
.float_format = _py_float_format_unknown, \
.double_format = _py_float_format_unknown, \
Expand Down
3 changes: 3 additions & 0 deletions Include/pymacro.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@
Py_FatalError("Unreachable C code path reached")
#endif

#define _Py_CONTAINER_OF(ptr, type, member) \
(type*)((char*)ptr - offsetof(type, member))

// Prevent using an expression as a l-value.
// For example, "int x; _Py_RVALUE(x) = 1;" fails with a compiler error.
#define _Py_RVALUE(EXPR) ((void)0, (EXPR))
Expand Down
7 changes: 4 additions & 3 deletions Python/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,12 @@ _PyEvent_Notify(PyEvent *evt)
void
PyEvent_Wait(PyEvent *evt)
{
while (!PyEvent_WaitTimed(evt, -1))
while (!PyEvent_WaitTimed(evt, -1, _PY_LOCK_DETACH))
;
}

int
PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns)
PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns, _PyLockFlags flags)
{
for (;;) {
uint8_t v = _Py_atomic_load_uint8(&evt->v);
Expand All @@ -288,9 +288,10 @@ PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns)
}
}

int detach = (flags & _PY_LOCK_DETACH) != 0;
uint8_t expected = _Py_HAS_PARKED;
(void) _PyParkingLot_Park(&evt->v, &expected, sizeof(evt->v),
timeout_ns, NULL, 1);
timeout_ns, NULL, detach);

return _Py_atomic_load_uint8(&evt->v) == _Py_LOCKED;
}
Expand Down
9 changes: 7 additions & 2 deletions Python/parking_lot.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ _PySemaphore_PlatformWait(_PySemaphore *sema, _PyTime_t timeout)
if (timeout >= 0) {
struct timespec ts;

# ifdef HAVE_SEM_CLOCKWAIT
_PyTime_t deadline = _PyDeadline_Init(timeout);
_PyTime_AsTimespec_clamp(deadline, &ts);
err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
# else
_PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout);
_PyTime_AsTimespec(deadline, &ts);

_PyTime_AsTimespec_clamp(deadline, &ts);
err = sem_timedwait(&sema->platform_sem, &ts);
#endif
}
else {
err = sem_wait(&sema->platform_sem);
Expand Down
200 changes: 200 additions & 0 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,206 @@ _PyThreadState_Detach(PyThreadState *tstate)
_PyEval_ReleaseLock(tstate->interp, tstate);
}

void
_PyThreadState_GC_Stop(PyThreadState *tstate)
{
_PyRuntimeState *runtime = &_PyRuntime;

assert(tstate->status == _Py_THREAD_ATTACHED);

struct stoptheworld_state *stw = NULL;
HEAD_LOCK(runtime);
if (runtime->stoptheworld.requested) {
stw = &runtime->stoptheworld;
}
else if (tstate->interp->stoptheworld.requested) {
stw = &tstate->interp->stoptheworld;
}
HEAD_UNLOCK(runtime);

if (stw == NULL) {
// We might be processing a stale EVAL_PLEASE_STOP, in which
// case there is nothing to do. This can happen if a thread
// asks us to stop for a previous GC at the same time we detach.
return;
}

_Py_qsbr_offline(((PyThreadStateImpl *)tstate)->qsbr);

if (tstate->critical_section != 0) {
_Py_critical_section_end_all(tstate);
}

_Py_atomic_store_int(&tstate->status, _Py_THREAD_GC);

HEAD_LOCK(runtime);
// Decrease stw_thread_countdown. If we're the last thread to stop,
// notify the thread that requested the stop-the-world.
runtime->stw_thread_countdown--;
assert(runtime->stw_thread_countdown >= 0);
if (runtime->stw_thread_countdown == 0) {
_PyRawEvent_Notify(&runtime->stw_stop_event);
}
HEAD_UNLOCK(runtime);

_PyThreadState_GC_Park(tstate);
}


void
_PyThreadState_Park(PyThreadState *tstate)
{
do {
int expected = _Py_THREAD_GC;

// Wait until we're switched out of GC to DETACHED.
_PyParkingLot_Park(&tstate->state, &expected, sizeof(tstate->state),
/*timeout=*/-1, NULL, /*detach=*/0);

// Once we're back in DETACHED we can re-attach
} while (!tstate_try_attach(tstate));
}


// ???

static PyInterpreterState *
interp_for_stop_the_world(struct _stoptheworld_state *stw)
{
if (stw->is_global) {
// Global stop-the-world events loop over all interpreters.
return PyInterpreterState_Head();
}
// Otherwise, get the PyInterpreterState that contains the `stw`.
return _Py_CONTAINER_OF(stw, PyInterpreterState, stoptheworld);
}

// Loops over threads for a stop-the-world event.
// For global: all threads in all interpreters
// For per-interpreter: all threads in the interpreter
#define _Py_FOR_EACH_THREAD(stw) \
for (PyInterpreterState *i = interp_for_stop_the_world((stw)); \
i != NULL; i = ((stw->is_global) ? i->next : NULL)) \
for (PyThreadState *t = i->threads.head; t; t = t->next)


// Try to transition threads atomically from the "detached" state to the
// "gc stopped" state. Returns true if all threads are in the "gc stopped"
static bool
park_detached_threads(struct _stoptheworld_state *stw)
{
int num_parked = 0;
_Py_FOR_EACH_THREAD(stw) {
int state = _Py_atomic_load_int_relaxed(&t->state);
if (state == _Py_THREAD_DETACHED) {
// Atomically transition to _Py_THREAD_GC if in detached state.
if (_Py_atomic_compare_exchange_int(&t->state,
&state, _Py_THREAD_GC)) {
num_parked++;
}
}
else if (state == _Py_THREAD_ATTACHED && t != stw->requester) {
_Py_set_eval_breaker_bit(t->interp, _PY_EVAL_PLEASE_STOP_BIT, 1);
}
}
stw->thread_countdown -= num_parked;
assert(stw->thread_countdown >= 0);
return num_parked > 0 && stw->thread_countdown == 0;
}

static void
stop_the_world(struct _stoptheworld_state *stw)
{
_PyRuntimeState *runtime = &_PyRuntime;

PyMutex_Lock(&stw->mutex);

HEAD_LOCK(runtime);
stw->requested = 1;
stw->thread_countdown = 0;
stw->requester = _PyThreadState_GET(); // may be NULL

_Py_FOR_EACH_THREAD(stw) {
if (t != stw->requester) {
// Count all the other threads (we don't wait on ourself).
stw->thread_countdown++;
}
}

if (stw->thread_countdown == 0) {
HEAD_UNLOCK(runtime);
stw->world_stopped = 1;
return;
}

for (;;) {
// Switch threads that are detached to the GC stopped state
bool stopped_all_threads = park_detached_threads(stw);
HEAD_UNLOCK(runtime);

if (stopped_all_threads) {
break;
}

int64_t wait_ns = 1000*1000; // 1ms
if (PyEvent_WaitTimed(&stw->stop_event, wait_ns, _Py_LOCK_DONT_DETACH)) {
assert(stw->thread_countdown == 0);
stw->stop_event = (PyEvent){0};
break;
}

HEAD_LOCK(runtime);
}
stw->world_stopped = 1;
}

static void
start_the_world(struct _stoptheworld_state *stw)
{
_PyRuntimeState *runtime = &_PyRuntime;
assert(PyMutex_IsLocked(&stw->mutex));

HEAD_LOCK(runtime);
stw->requested = 0;
stw->world_stopped = 0;
stw->requester = NULL;
_Py_FOR_EACH_THREAD(stw) {
int state = _Py_atomic_load_int_relaxed(&t->state);
if (state == _Py_THREAD_GC &&
_Py_atomic_compare_exchange_int(&t->state,
&state,
_Py_THREAD_DETACHED)) {
_PyParkingLot_UnparkAll(&t->state);
}
}
HEAD_UNLOCK(runtime);
PyMutex_Unlock(&stw->mutex);
}

void
_PyRuntimeState_StopTheWorld(_PyRuntimeState *runtime)
{
stop_the_world(&runtime->stoptheworld);
}

void
_PyRuntimeState_StartTheWorld(_PyRuntimeState *runtime)
{
start_the_world(&runtime->stoptheworld);
}

void
_PyInterpreterState_StopTheWorld(PyInterpreterState *interp)
{
stop_the_world(&interp->stoptheworld);
}

void
_PyInterpreterState_StartTheWorld(PyInterpreterState *interp)
{
start_the_world(&interp->stoptheworld);
}

//----------
// other API
//----------
Expand Down

0 comments on commit 8d11726

Please sign in to comment.