diff --git a/.gitignore b/.gitignore index d021cd45e..b2c37f41a 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,5 @@ __pycache__/ .act-artifacts/ .secrets actionlint +build-debug/ +build-tsan/ diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index 9ec286c27..c9cae6a17 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -98,13 +98,18 @@ namespace phlex::experimental { } else { accessor a; if (stores_.insert(a, store->index()->hash())) { - auto result = call(ft, messages, std::make_index_sequence{}); - ++calls_; - ++product_count_[store->index()->layer_hash()]; - products new_products; - new_products.add_all(output_, std::move(result)); - a->second = std::make_shared( - store->index(), this->full_name(), std::move(new_products)); + try { + auto result = call(ft, messages, std::make_index_sequence{}); + ++calls_; + ++product_count_[store->index()->layer_hash()]; + products new_products; + new_products.add_all(output_, std::move(result)); + a->second = std::make_shared( + store->index(), this->full_name(), std::move(new_products)); + } catch (...) { + stores_.erase(a); + throw; + } message const new_msg{a->second, message_id}; stay_in_graph.try_put(new_msg); diff --git a/plugins/python/src/lifelinewrap.cpp b/plugins/python/src/lifelinewrap.cpp index a00e1d72f..f954b48df 100644 --- a/plugins/python/src/lifelinewrap.cpp +++ b/plugins/python/src/lifelinewrap.cpp @@ -9,35 +9,18 @@ static py_lifeline_t* ll_new(PyTypeObject* pytype, PyObject*, PyObject*) { py_lifeline_t* pyobj = (py_lifeline_t*)pytype->tp_alloc(pytype, 0); if (!pyobj) - PyErr_Print(); + return nullptr; pyobj->m_view = nullptr; new (&pyobj->m_source) std::shared_ptr{}; return pyobj; } -static int ll_traverse(py_lifeline_t* pyobj, visitproc visit, void* args) -{ - if (pyobj->m_view) - visit(pyobj->m_view, args); - return 0; -} - -static int ll_clear(py_lifeline_t* pyobj) -{ - Py_CLEAR(pyobj->m_view); - return 0; -} - static void ll_dealloc(py_lifeline_t* pyobj) { - // This type participates in GC; untrack before clearing references so the - // collector does not traverse a partially torn-down object during dealloc. - PyObject_GC_UnTrack(pyobj); Py_CLEAR(pyobj->m_view); typedef std::shared_ptr generic_shared_t; pyobj->m_source.~generic_shared_t(); - // Use tp_free to pair with tp_alloc for GC-tracked Python objects. Py_TYPE(pyobj)->tp_free((PyObject*)pyobj); } @@ -62,10 +45,10 @@ PyTypeObject phlex::experimental::PhlexLifeline_Type = { 0, // tp_getattro 0, // tp_setattro 0, // tp_as_buffer - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, // tp_flags + Py_TPFLAGS_DEFAULT, // tp_flags (char*)"internal", // tp_doc - (traverseproc)ll_traverse, // tp_traverse - (inquiry)ll_clear, // tp_clear + 0, // tp_traverse + 0, // tp_clear 0, // tp_richcompare 0, // tp_weaklistoffset 0, // tp_iter diff --git a/plugins/python/src/modulewrap.cpp b/plugins/python/src/modulewrap.cpp index bfc475876..f52c8135f 100644 --- a/plugins/python/src/modulewrap.cpp +++ b/plugins/python/src/modulewrap.cpp @@ -17,7 +17,7 @@ using phlex::product_query; // TODO: the layer is currently hard-wired and should come from the product // specification instead, but that doesn't exist in Python yet. -static std::string const LAYER = "job"; +static std::string const LAYER = "event"; // Simple phlex module wrapper // clang-format off @@ -58,8 +58,16 @@ namespace { static inline PyObject* lifeline_transform(intptr_t arg) { PyObject* pyobj = (PyObject*)arg; - if (pyobj && PyObject_TypeCheck(pyobj, &PhlexLifeline_Type)) { - return ((py_lifeline_t*)pyobj)->m_view; + if (!pyobj) { + throw std::runtime_error("lifeline_transform received null PyObject* argument"); + } + if (PyObject_TypeCheck(pyobj, &PhlexLifeline_Type)) { + PyObject* view = ((py_lifeline_t*)pyobj)->m_view; + if (!view) { + throw std::runtime_error( + "PhlexLifeline has null m_view pointer - object may be uninitialized or corrupted"); + } + return view; } return pyobj; } @@ -107,8 +115,14 @@ namespace { PyGILRAII gil; - PyObject* result = - PyObject_CallFunctionObjArgs(m_callable, lifeline_transform(args)..., nullptr); + // Extract views from PhlexLifeline objects. The returned pointers are + // borrowed references; INCREF them to create temporary owned references + // that survive for the duration of the Python call. + PyObject* py_args[N] = {lifeline_transform(args)...}; + for (auto* p : py_args) + Py_INCREF(p); + + PyObject* result = call_with_array(py_args, std::make_index_sequence{}); std::string error_msg; if (!result) { @@ -116,6 +130,8 @@ namespace { error_msg = "Unknown python error"; } + for (auto* p : py_args) + Py_DECREF(p); decref_all(args...); if (!error_msg.empty()) { @@ -132,7 +148,14 @@ namespace { PyGILRAII gil; - PyObject* result = PyObject_CallFunctionObjArgs(m_callable, (PyObject*)args..., nullptr); + // Extract views from PhlexLifeline objects. The returned pointers are + // borrowed references; INCREF them to create temporary owned references + // that survive for the duration of the Python call. + PyObject* py_args[N] = {lifeline_transform(args)...}; + for (auto* p : py_args) + Py_INCREF(p); + + PyObject* result = call_with_array(py_args, std::make_index_sequence{}); std::string error_msg; if (!result) { @@ -141,6 +164,8 @@ namespace { } else Py_DECREF(result); + for (auto* p : py_args) + Py_DECREF(p); decref_all(args...); if (!error_msg.empty()) { @@ -149,6 +174,12 @@ namespace { } private: + template + PyObject* call_with_array(PyObject* (&py_args)[N], std::index_sequence) + { + return PyObject_CallFunctionObjArgs(m_callable, py_args[Is]..., nullptr); + } + template void decref_all(Args... args) { @@ -358,7 +389,7 @@ namespace { PyGILRAII gil; \ \ if (!v) \ - return (intptr_t)nullptr; \ + throw std::runtime_error("null vector<" #cpptype "> passed to " #name "_to_py"); \ \ /* use a numpy view with the shared pointer tied up in a lifeline object (note: this */ \ /* is just a demonstrator; alternatives are still being considered) */ \ @@ -370,8 +401,12 @@ namespace { (void*)(v->data()) /* raw buffer */ \ ); \ \ - if (!np_view) \ - return (intptr_t)nullptr; \ + if (!np_view) { \ + std::string py_msg; \ + msg_from_py_error(py_msg, true); \ + throw std::runtime_error("failed to create numpy array in " #name "_to_py" + \ + (py_msg.empty() ? std::string{} : ": " + py_msg)); \ + } \ \ /* make the data read-only by not making it writable */ \ PyArray_CLEARFLAGS((PyArrayObject*)np_view, NPY_ARRAY_WRITEABLE); \ @@ -383,7 +418,10 @@ namespace { (py_lifeline_t*)PhlexLifeline_Type.tp_new(&PhlexLifeline_Type, nullptr, nullptr); \ if (!pyll) { \ Py_DECREF(np_view); \ - return (intptr_t)nullptr; \ + std::string py_msg; \ + msg_from_py_error(py_msg, true); \ + throw std::runtime_error("failed to create lifeline in " #name "_to_py" + \ + (py_msg.empty() ? std::string{} : ": " + py_msg)); \ } \ pyll->m_source = v; \ pyll->m_view = np_view; /* steals reference */ \ diff --git a/plugins/python/src/python-refcounting.md b/plugins/python/src/python-refcounting.md new file mode 100644 index 000000000..40d6bacf2 --- /dev/null +++ b/plugins/python/src/python-refcounting.md @@ -0,0 +1,238 @@ +# Python Reference Counting in the Phlex Python Plugin + +## Overview + +Phlex's Python plugin bridges C++ and Python through `intptr_t` values +that represent `PyObject*` pointers. These values flow through the +framework's `declared_transform` nodes, which cache their outputs for +reuse when duplicate hashes appear. This document describes the +reference counting discipline used to avoid both leaks and crashes. + +## Architecture + +A typical Python transform pipeline looks like this: + +```text +Provider → [input converter] → [py_callback] → [output converter] → Observer + (C++ → PyObject) (Py → Py) (PyObject → C++) +``` + +Each `[…]` above is its own `declared_transform` node. Each node is +named with a combination of the type, the product label, and the +consumer algorithm, so every converter has a **unique** identity. + +## The Caching Mechanism + +`declared_transform` (see `phlex/core/declared_transform.hpp`) caches +its output product store by `data_cell_index::hash()`: + +```cpp +if (stores_.insert(a, hash)) { + // First time: call the transform, store result + a->second = make_shared(...); +} else { + // Cache hit: reuse a->second without calling the transform +} +``` + +Each transform function is called **exactly once** per unique hash. +Duplicate events with the same hash reuse the cached product store +without re-invoking the transform. The cache entry is erased once +`done_with()` returns true (all events for that hash have been +processed). + +## The One-to-One Model + +Because each converter node has a unique name and each transform is +called once per unique hash, reference counting is **one-to-one**: + +1. **Input converters** create a new `PyObject*` reference (refcnt=1). +2. **Consumers** (`py_callback` or output converters) `Py_DECREF` the + reference after use (refcnt→0, freed). + +No `XINCREF`/`XDECREF` wrapping is needed around the callback call. +Adding one would produce a net +1 leak per product. + +### Input Converters (`_to_py`) + +Each input converter creates a **new reference** that is stored in the +product store as an `intptr_t`. The framework treats `intptr_t` as an +opaque integer — it has no C++ destructor and cannot call `Py_DECREF`. +The consumer is responsible for releasing the reference. + +```cpp +// BASIC_CONVERTER: creates new reference via Python C API +static intptr_t int_to_py(int a) { + PyGILRAII gil; + return (intptr_t)PyLong_FromLong(a); // new reference, refcnt=1 +} + +// VECTOR_CONVERTER: creates new PhlexLifeline wrapping a numpy view +static intptr_t vint_to_py(shared_ptr> const& v) { + // ... creates PyArrayObject + PhlexLifeline ... + return (intptr_t)pyll; // new reference, refcnt=1 +} +``` + +### py_callback (`call` / `callv`) + +`py_callback::call()` and `py_callback::callv()` receive `intptr_t` +args that are **owned references** created by the upstream input +converter, dedicated to this specific consumer. They: + +1. Call `lifeline_transform()` to unwrap any `PhlexLifeline` objects + (extracting the numpy view for the Python function) and store the + results in a local array. +2. `Py_INCREF` each transformed arg to create a temporary owned + reference — this protects borrowed `m_view` pointers from being + invalidated during the Python call. +3. Pass the transformed args to `PyObject_CallFunctionObjArgs()`. +4. `Py_DECREF` each transformed arg (releasing the temporary refs). +5. Call `decref_all(args...)` to release the original input references + (PhlexLifeline objects and basic PyObjects). + +```cpp +template +intptr_t call(Args... args) { + PyGILRAII gil; + PyObject* py_args[N] = {lifeline_transform(args)...}; + for (auto* p : py_args) + Py_INCREF(p); + PyObject* result = call_with_array(py_args, ...); + // ... error handling ... + for (auto* p : py_args) + Py_DECREF(p); + decref_all(args...); // release input references + return (intptr_t)result; // new reference from Python call +} +``` + +Both `call()` and `callv()` use `lifeline_transform()` symmetrically. + +### Output Converters (`py_to_*`) + +Output converters receive the `intptr_t` from `py_callback`'s product +store — again, an **owned reference** created by the Python call, +dedicated to this specific output converter. They extract the C++ +value, then `Py_DECREF` the input: + +```cpp +// BASIC_CONVERTER py_to_* +static int py_to_int(intptr_t pyobj) { + PyGILRAII gil; + int i = (int)PyLong_AsLong((PyObject*)pyobj); + // ... error handling ... + Py_DECREF((PyObject*)pyobj); // release the reference + return i; +} + +// NUMPY_ARRAY_CONVERTER py_to_* +static shared_ptr> py_to_vint(intptr_t pyobj) { + PyGILRAII gil; + auto vec = make_shared>(); + // ... copy data from PyArray or PyList ... + Py_DECREF((PyObject*)pyobj); // release the reference + return vec; +} +``` + +## Reference Flow Diagram + +```text + ┌──────────────┐ + │ Provider │ C++ value (int, float, vector) + └──────┬───────┘ + │ + ┌──────▼───────┐ + │ input conv. │ Creates NEW PyObject* (refcnt=1) + │ (e.g. int_ │ Stored in product_store as intptr_t + │ to_py) │ One per unique hash; consumer owns it + └──────┬───────┘ + │ intptr_t (owned reference, refcnt=1) + ┌──────▼───────┐ + │ py_callback │ lifeline_transform() + INCREF to protect + │ ::call() │ PyObject_CallFunctionObjArgs() + │ │ DECREF views, decref_all(args...) → freed + │ │ Returns result (NEW reference, refcnt=1) + └──────┬───────┘ + │ intptr_t (owned reference, refcnt=1) + ┌──────▼───────┐ + │ output conv. │ Reads PyObject* value + │ (e.g. py_to_ │ Py_DECREF → refcnt=0, freed + │ int) │ Returns C++ value + └──────┬───────┘ + │ + ┌──────▼───────┐ + │ Observer │ Uses C++ value + └──────────────┘ +``` + +## Error Handling + +### declared_transform Exception Safety + +`declared_transform::stores_.insert()` creates an entry with a null +`product_store_ptr` before calling the transform function. If the +transform throws (e.g., because a converter encounters an error), the +null entry must be erased to prevent SEGFAULT on cache reuse. This is +handled by a `try/catch` block that erases the stale entry and +re-throws. + +### VECTOR_CONVERTER Must Throw on Error + +`VECTOR_CONVERTER` error paths throw `std::runtime_error` instead of +returning `(intptr_t)nullptr`. A null `intptr_t` passed to +`PyObject_CallFunctionObjArgs` would act as the argument-list sentinel, +silently truncating the argument list. + +### lifeline_transform Returns a Borrowed Reference + +`lifeline_transform()` unwraps `PhlexLifeline` objects to extract the +numpy array view (`m_view`). The returned pointer is a borrowed +reference from the lifeline object. Because this borrowed reference +could be invalidated by concurrent operations, `call()` and `callv()` +INCREF the transformed args before the Python call and DECREF them +after, creating temporary owned references that protect the views. + +### ll_new Must Return nullptr on Allocation Failure + +`ll_new` (in `lifelinewrap.cpp`) returns `nullptr` when `tp_alloc` +fails, rather than falling through to dereference the null pointer. + +### PhlexLifeline Must Not Use Py_TPFLAGS_HAVE_GC + +`PhlexLifeline` does not participate in reference cycles — its +`m_view` points to a numpy array (which is not GC-tracked) and nothing +references the `PhlexLifeline` from tracked Python objects. + +With `Py_TPFLAGS_HAVE_GC`, `PyType_GenericAlloc` would track every +freshly allocated `PhlexLifeline` in the garbage collector, making it +eligible for GC cycle detection. During cycle detection the collector +visits every tracked object — potentially while the object is still +being initialized in `ll_new` or while a TBB worker thread is +concurrently dereferencing `m_view`. Removing the flag avoids this +class of race entirely and eliminates a spurious `PyObject_GC_UnTrack` +call on destruction. + +## Common Pitfalls + +1. **Do not remove `decref_all` from `py_callback` or `Py_DECREF` from + output converters.** These are the "consume" side of the one-to-one + contract. Removing them leaks every converted object. + +2. **Do not add bare `Py_XINCREF`/`Py_XDECREF` that change the net + refcount.** The INCREF/DECREF in `call()`/`callv()` is balanced + (net +0) and only protects borrowed views during the Python call. + Adding unbalanced INCREF/DECREF introduces leaks. + +3. **Do not return `(intptr_t)nullptr` from converters.** It acts as + the `PyObject_CallFunctionObjArgs` sentinel. Throw an exception + instead. + +## Known Limitations + +- `BASIC_CONVERTER`'s `_to_py` functions can return + `(intptr_t)nullptr` if the underlying Python C API call fails (e.g., + `PyLong_FromLong` on OOM). This null would propagate through the + product store and cause a crash in `decref_all`. This is an + extremely rare edge case that has not been addressed yet. diff --git a/test/python/source.cpp b/test/python/source.cpp index a08cb01cf..5211fc062 100644 --- a/test/python/source.cpp +++ b/test/python/source.cpp @@ -7,58 +7,58 @@ using namespace phlex; PHLEX_REGISTER_PROVIDERS(s) { s.provide("provide_i", [](data_cell_index const& id) -> int { return id.number() % 2; }) - .output_product("i"_in("job")); + .output_product("i"_in("event")); s.provide("provide_j", [](data_cell_index const& id) -> int { return 1 - (int)(id.number() % 2); }) - .output_product("j"_in("job")); + .output_product("j"_in("event")); s.provide("provide_k", [](data_cell_index const&) -> int { return 0; }) - .output_product("k"_in("job")); + .output_product("k"_in("event")); s.provide("provide_f1", [](data_cell_index const& id) -> float { return (float)((id.number() % 100) / 100.0); }) - .output_product("f1"_in("job")); + .output_product("f1"_in("event")); s.provide( "provide_f2", [](data_cell_index const& id) -> float { return 1.0f - (float)((id.number() % 100) / 100.0); }) - .output_product("f2"_in("job")); + .output_product("f2"_in("event")); s.provide( "provide_d1", [](data_cell_index const& id) -> double { return (double)((id.number() % 100) / 100.0); }) - .output_product("d1"_in("job")); + .output_product("d1"_in("event")); s.provide("provide_d2", [](data_cell_index const& id) -> double { return 1.0 - (double)((id.number() % 100) / 100.0); }) - .output_product("d2"_in("job")); + .output_product("d2"_in("event")); s.provide( "provide_u1", [](data_cell_index const& id) -> unsigned int { return (unsigned int)(id.number() % 2); }) - .output_product("u1"_in("job")); + .output_product("u1"_in("event")); s.provide( "provide_u2", [](data_cell_index const& id) -> unsigned int { return 1 - (unsigned int)(id.number() % 2); }) - .output_product("u2"_in("job")); + .output_product("u2"_in("event")); s.provide("provide_l1", [](data_cell_index const& id) -> long { return (long)(id.number() % 2); }) - .output_product("l1"_in("job")); + .output_product("l1"_in("event")); s.provide("provide_l2", [](data_cell_index const& id) -> long { return 1 - (long)(id.number() % 2); }) - .output_product("l2"_in("job")); + .output_product("l2"_in("event")); s.provide( "provide_ul1", [](data_cell_index const& id) -> unsigned long { return (unsigned long)(id.number() % 101); }) - .output_product("ul1"_in("job")); + .output_product("ul1"_in("event")); s.provide("provide_ul2", [](data_cell_index const& id) -> unsigned long { return 100 - (unsigned long)(id.number() % 101); }) - .output_product("ul2"_in("job")); + .output_product("ul2"_in("event")); s.provide("provide_b1", [](data_cell_index const& id) -> bool { return (id.number() % 2) == 0; }) - .output_product("b1"_in("job")); + .output_product("b1"_in("event")); s.provide("provide_b2", [](data_cell_index const& id) -> bool { return (id.number() % 2) != 0; }) - .output_product("b2"_in("job")); + .output_product("b2"_in("event")); }