diff --git a/ddtrace/internal/core/event_hub.py b/ddtrace/internal/core/event_hub.py index f7bd66a55c0..34e3429e852 100644 --- a/ddtrace/internal/core/event_hub.py +++ b/ddtrace/internal/core/event_hub.py @@ -1,139 +1,31 @@ -import dataclasses -import enum -from typing import Any -from typing import Callable -from typing import Optional - -from ddtrace.internal.settings._config import config - - -_listeners: dict[str, dict[Any, Callable[..., Any]]] = {} - - -class ResultType(enum.Enum): - RESULT_OK = 0 - RESULT_EXCEPTION = 1 - RESULT_UNDEFINED = -1 - - -@dataclasses.dataclass -class EventResult: - response_type: ResultType = ResultType.RESULT_UNDEFINED - value: Any = None - exception: Optional[Exception] = None - - def __bool__(self): - "EventResult can easily be checked as a valid result" - return self.response_type == ResultType.RESULT_OK - - -_MissingEvent = EventResult() - - -class EventResultDict(dict[str, EventResult]): - def __missing__(self, key: str) -> EventResult: - return _MissingEvent - - def __getattr__(self, name: str) -> EventResult: - return dict.__getitem__(self, name) - - -_MissingEventDict = EventResultDict() - - -def has_listeners(event_id: str) -> bool: - """Check if there are hooks registered for the provided event_id""" - global _listeners - return bool(_listeners.get(event_id)) - - -def on(event_id: str, callback: Callable[..., Any], name: Any = None) -> None: - """Register a listener for the provided event_id""" - global _listeners - if name is None: - name = id(callback) - if event_id not in _listeners: - _listeners[event_id] = {} - _listeners[event_id][name] = callback - - -def reset(event_id: Optional[str] = None, callback: Optional[Callable[..., Any]] = None) -> None: - """Remove all registered listeners. If an event_id is provided, only clear those - event listeners. If a callback is provided, then only the listeners for that callback are removed. - """ - global _listeners - - if callback: - if event_id in _listeners: - _listeners[event_id] = {name: cb for name, cb in _listeners[event_id].items() if cb != callback} - else: - if not event_id: - _listeners.clear() - elif event_id in _listeners: - del _listeners[event_id] - - -def dispatch_event(event, allow_raise: bool = False) -> None: +from ddtrace.internal.core.events import Event +from ddtrace.internal.native._native import EventResult +from ddtrace.internal.native._native import EventResultDict +from ddtrace.internal.native._native import ResultType +from ddtrace.internal.native._native import dispatch +from ddtrace.internal.native._native import dispatch_with_results +from ddtrace.internal.native._native import has_listeners +from ddtrace.internal.native._native import on +from ddtrace.internal.native._native import reset + + +def dispatch_event(event: Event, allow_raise: bool = False) -> None: """Call all hooks for the provided event. - When ``allow_raise=True``, listener ``Exception``s propagate to the caller - (the first listener to raise wins; subsequent listeners are skipped). - ``BaseException``-derived exceptions (including ``DDBlockException``) - always propagate regardless of ``allow_raise``. - - PERF: Avoid calling `dispatch` to reduce function calls/overhead of this function. - """ - global _listeners - - event_id = event.event_name - if event_id not in _listeners: - return - - for local_hook in _listeners[event_id].values(): - try: - local_hook(event) - except Exception: - if allow_raise or config._raise: - raise - - -def dispatch(event_id: str, args: tuple[Any, ...] = (), allow_raise: bool = False) -> None: - """Call all hooks for the provided event_id with the provided args. - - When ``allow_raise=True``, listener ``Exception``s propagate to the caller - (the first listener to raise wins; subsequent listeners are skipped). - ``BaseException``-derived exceptions (including ``DDBlockException``) - always propagate regardless of ``allow_raise``. - """ - global _listeners - - if event_id not in _listeners: - return - - for local_hook in _listeners[event_id].values(): - try: - local_hook(*args) - except Exception: - if allow_raise or config._raise: - raise - - -def dispatch_with_results(event_id: str, args: tuple[Any, ...] = ()) -> EventResultDict: - """Call all hooks for the provided event_id with the provided args - returning the results and exceptions from the called hooks + When ``allow_raise=True``, listener ``Exception``s propagate to the caller. + ``BaseException``-derived exceptions always propagate regardless of ``allow_raise``. """ - global _listeners - - if event_id not in _listeners: - return _MissingEventDict - - results = EventResultDict() - for name, hook in _listeners[event_id].items(): - try: - results[name] = EventResult(ResultType.RESULT_OK, hook(*args)) - except Exception as e: - if config._raise: - raise - results[name] = EventResult(ResultType.RESULT_EXCEPTION, None, e) - - return results + dispatch(event.event_name, (event,), allow_raise) + + +__all__ = [ + "EventResult", + "EventResultDict", + "ResultType", + "dispatch", + "dispatch_event", + "dispatch_with_results", + "has_listeners", + "on", + "reset", +] diff --git a/ddtrace/internal/native/_native.pyi b/ddtrace/internal/native/_native.pyi index 648a620aba8..134d9caa2b8 100644 --- a/ddtrace/internal/native/_native.pyi +++ b/ddtrace/internal/native/_native.pyi @@ -695,6 +695,39 @@ class SpanLink: def __repr__(self) -> str: ... def __reduce__(self) -> tuple: ... +class ResultType: + value: int + name: str + RESULT_OK: "ResultType" + RESULT_EXCEPTION: "ResultType" + RESULT_UNDEFINED: "ResultType" + def __eq__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __repr__(self) -> str: ... + def __int__(self) -> int: ... + +class EventResult: + response_type: Any + value: Any + exception: Any + def __init__( + self, + response_type: Any = None, + value: Any = None, + exception: Any = None, + ) -> None: ... + def __bool__(self) -> bool: ... + def __repr__(self) -> str: ... + +class EventResultDict(dict): + def __missing__(self, key: Any) -> EventResult: ... + def __getattr__(self, name: str) -> EventResult: ... + +def has_listeners(event_id: str) -> bool: ... +def on(event_id: str, callback: Any, name: Any = None) -> None: ... +def reset(event_id: Optional[str] = None, callback: Optional[Any] = None) -> None: ... +def dispatch(event_id: str, args: Optional[tuple] = None, allow_raise: bool = False) -> None: ... +def dispatch_with_results(event_id: str, args: Optional[tuple] = None) -> EventResultDict: ... def flatten_key_value(root_key: str, value: Any) -> dict[str, Any]: ... def is_sequence(obj: Any) -> bool: ... def seed() -> None: ... diff --git a/src/native/event_hub.rs b/src/native/event_hub.rs new file mode 100644 index 00000000000..f1e881c1a04 --- /dev/null +++ b/src/native/event_hub.rs @@ -0,0 +1,451 @@ +use pyo3::{ + prelude::*, + types::{PyDict, PyList, PyTuple}, + PyTraverseError, PyVisit, +}; +use std::collections::HashMap; +use std::sync::{LazyLock, OnceLock, RwLock}; + +type Listeners = HashMap, Py)>>; + +// (name_key, callback) pairs per event_id +static LISTENERS: LazyLock> = LazyLock::new(|| RwLock::new(HashMap::new())); + +// Cached Python objects — initialized on first use, never invalidated. +// The losing thread in a race drops its Py; pyo3 0.28 defers the decref. +static MISSING_EVENT: OnceLock> = OnceLock::new(); +static MISSING_EVENT_DICT: OnceLock> = OnceLock::new(); +// ResultType::Ok/Exception interned once so dispatch_with_results avoids per-listener allocation. +static RT_OK_PY: OnceLock> = OnceLock::new(); +static RT_EXCEPTION_PY: OnceLock> = OnceLock::new(); + +// OnceLock::get_or_try_init is unstable; use get+set pattern instead. +macro_rules! get_or_init { + ($cell:expr, $py:expr, $init:expr) => {{ + if let Some(val) = $cell.get() { + Ok(val) + } else { + let val: PyResult> = $init; + match val { + Ok(v) => { + let _ = $cell.set(v); + Ok($cell.get().unwrap()) + } + Err(e) => Err(e), + } + } + }}; +} + +/// Native equivalent of the Python ResultType enum. +/// pyo3 automatically exposes each variant as a class attribute. +#[pyclass( + eq, + hash, + frozen, + from_py_object, + module = "ddtrace.internal.native._native" +)] +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum ResultType { + #[pyo3(name = "RESULT_OK")] + Ok = 0, + #[pyo3(name = "RESULT_EXCEPTION")] + Exception = 1, + #[pyo3(name = "RESULT_UNDEFINED")] + Undefined = -1, +} + +#[pymethods] +impl ResultType { + #[getter] + fn value(&self) -> i32 { + match self { + ResultType::Ok => 0, + ResultType::Exception => 1, + ResultType::Undefined => -1, + } + } + + #[getter] + fn name(&self) -> &'static str { + match self { + ResultType::Ok => "RESULT_OK", + ResultType::Exception => "RESULT_EXCEPTION", + ResultType::Undefined => "RESULT_UNDEFINED", + } + } + + fn __repr__(&self) -> String { + format!("", self.name(), self.value()) + } + + fn __int__(&self) -> i32 { + self.value() + } +} + +fn get_missing_event(py: Python<'_>) -> PyResult<&'static Py> { + get_or_init!(MISSING_EVENT, py, { + let result_type_undefined = ResultType::Undefined.into_pyobject(py)?.into_any().unbind(); + let event_result = Py::new( + py, + EventResult { + response_type: Some(result_type_undefined), + value: None, + exception: None, + is_ok: false, + }, + )?; + Ok(event_result.into_any()) + }) +} + +fn get_missing_event_dict(py: Python<'_>) -> PyResult<&'static Py> { + get_or_init!(MISSING_EVENT_DICT, py, { + Ok(Py::new(py, EventResultDict)?.into_any()) + }) +} + +fn get_rt_ok(py: Python<'_>) -> PyResult<&'static Py> { + get_or_init!(RT_OK_PY, py, { + Ok(ResultType::Ok.into_pyobject(py)?.into_any().unbind()) + }) +} + +fn get_rt_exception(py: Python<'_>) -> PyResult<&'static Py> { + get_or_init!(RT_EXCEPTION_PY, py, { + Ok(ResultType::Exception.into_pyobject(py)?.into_any().unbind()) + }) +} + +fn repr_field(py: Python<'_>, field: &Option>) -> PyResult { + Ok(match field { + Some(o) => o.bind(py).repr()?.to_string(), + None => "None".to_string(), + }) +} + +/// Python-exported result of a single event listener invocation. +/// is_ok is a private fast path for __bool__ that avoids Python equality. +/// response_type is get-only to keep is_ok consistent; value and exception are mutable. +#[pyclass(module = "ddtrace.internal.native._native")] +pub struct EventResult { + #[pyo3(get)] + pub response_type: Option>, + #[pyo3(get, set)] + pub value: Option>, + #[pyo3(get, set)] + pub exception: Option>, + is_ok: bool, +} + +#[pymethods] +impl EventResult { + #[new] + #[pyo3(signature = (response_type=None, value=None, exception=None))] + fn py_new( + py: Python<'_>, + response_type: Option>, + value: Option>, + exception: Option>, + ) -> Self { + let is_ok = response_type + .as_ref() + .and_then(|rt| rt.extract::(py).ok()) + .map(|rt| rt == ResultType::Ok) + .unwrap_or(false); + Self { + response_type: Some(response_type.unwrap_or_else(|| py.None().into_any())), + value: Some(value.unwrap_or_else(|| py.None().into_any())), + exception: Some(exception.unwrap_or_else(|| py.None().into_any())), + is_ok, + } + } + + fn __bool__(&self) -> bool { + self.is_ok + } + + fn __repr__(&self, py: Python<'_>) -> PyResult { + let rt = repr_field(py, &self.response_type)?; + let val = repr_field(py, &self.value)?; + let exc = repr_field(py, &self.exception)?; + Ok(format!( + "EventResult(response_type={rt}, value={val}, exception={exc})" + )) + } + + fn __traverse__(&self, visit: PyVisit<'_>) -> Result<(), PyTraverseError> { + if let Some(ref o) = self.response_type { + visit.call(o)?; + } + if let Some(ref o) = self.value { + visit.call(o)?; + } + if let Some(ref o) = self.exception { + visit.call(o)?; + } + Ok(()) + } + + fn __clear__(&mut self) { + self.response_type = None; + self.value = None; + self.exception = None; + self.is_ok = false; + } +} + +/// dict subclass mapping listener name keys to EventResult values. +/// __missing__ returns the _MissingEvent singleton for absent keys. +/// __getattr__ enables attribute-style access: result.listener_name. +/// GC is inherited from PyDict — no extra fields to traverse or clear. +#[pyclass(extends=PyDict, module = "ddtrace.internal.native._native")] +pub struct EventResultDict; + +#[pymethods] +impl EventResultDict { + #[new] + fn py_new() -> Self { + EventResultDict + } + + fn __missing__(&self, key: Bound<'_, PyAny>) -> PyResult> { + let py = key.py(); + Ok(get_missing_event(py)?.clone_ref(py)) + } + + fn __getattr__(slf: Bound<'_, Self>, name: &str) -> PyResult> { + let py = slf.py(); + match slf.as_any().cast::()?.get_item(name)? { + Some(v) => Ok(v.unbind()), + None => Ok(get_missing_event(py)?.clone_ref(py)), + } + } +} + +// Coerce any Python object to a tuple for use as call args. +// Accepts tuple (fast path, zero copy), list, or any iterable. +// On failure returns an empty tuple rather than propagating an error, +// so a misbehaving caller cannot crash the host application. +fn coerce_to_tuple<'py>(py: Python<'py>, args: Option>) -> Bound<'py, PyTuple> { + let Some(obj) = args else { + return PyTuple::empty(py); + }; + let bound = obj.into_bound(py); + let bound = match bound.cast_into::() { + Ok(t) => return t, + Err(e) => e.into_inner(), + }; + let bound = match bound.cast_into::() { + Ok(l) => return PyTuple::new(py, l.iter()).unwrap_or_else(|_| PyTuple::empty(py)), + Err(e) => e.into_inner(), + }; + // General iterable fallback + let result = bound + .try_iter() + .and_then(|iter| iter.collect::>>()) + .and_then(|v| PyTuple::new(py, v)); + result.unwrap_or_else(|_| PyTuple::empty(py)) +} + +#[pyfunction] +pub fn has_listeners(event_id: &str) -> bool { + LISTENERS + .read() + .unwrap() + .get(event_id) + .map(|v| !v.is_empty()) + .unwrap_or(false) +} + +#[pyfunction] +#[pyo3(signature = (event_id, callback, name=None))] +pub fn on( + py: Python<'_>, + event_id: String, + callback: Py, + name: Option>, +) -> PyResult<()> { + let key: Py = match name { + Some(n) => n, + None => callback.clone_ref(py), + }; + + // Phase 1: snapshot existing keys under the read lock — no Python calls. + // Clone to hold references so objects stay alive during the equality scan below. + let snapshot: Vec> = { + let guard = LISTENERS.read().unwrap(); + guard + .get(&event_id) + .filter(|v| !v.is_empty()) + .map(|v| v.iter().map(|(k, _)| k.clone_ref(py)).collect()) + .unwrap_or_default() + }; + + // Phase 2: equality scan with NO lock held. + // __eq__ can safely reenter the hub here — no lock is held. + let key_bound = key.bind(py); + let mut matched: Option<&Py> = None; + for existing_key in &snapshot { + let existing_bound = existing_key.bind(py); + if existing_bound.is(key_bound) || existing_bound.eq(key_bound)? { + matched = Some(existing_key); + break; + } + } + + // Phase 3: write under lock, identity only — no Python calls. + let mut guard = LISTENERS.write().unwrap(); + let vec = guard.entry(event_id).or_default(); + if let Some(mk) = matched { + let mk_bound = mk.bind(py); + for (existing_key, existing_cb) in vec.iter_mut() { + if existing_key.bind(py).is(mk_bound) { + *existing_cb = callback; + return Ok(()); + } + } + // Matched key was removed by a concurrent reset() — fall through to append. + } + // Identity guard: handles exact-same-object re-registration and concurrent races. + for (existing_key, existing_cb) in vec.iter_mut() { + if existing_key.bind(py).is(key_bound) { + *existing_cb = callback; + return Ok(()); + } + } + vec.push((key, callback)); + Ok(()) +} + +#[pyfunction] +#[pyo3(signature = (event_id=None, callback=None))] +pub fn reset(py: Python<'_>, event_id: Option<&str>, callback: Option>) { + let mut guard = LISTENERS.write().unwrap(); + if let Some(cb) = callback { + if let Some(eid) = event_id { + if let Some(vec) = guard.get_mut(eid) { + vec.retain(|(_, stored_cb)| stored_cb.bind(py).ne(cb.bind(py)).unwrap_or(true)); + } + } + } else if let Some(eid) = event_id { + guard.remove(eid); + } else { + guard.clear(); + } +} + +#[inline(always)] +fn should_propagate(e: &PyErr, py: Python<'_>, allow_raise: bool) -> bool { + // Mirrors `except Exception:` semantics: BaseException subclasses propagate always. + !e.is_instance_of::(py) + || allow_raise + || crate::config::get_raise() +} + +#[pyfunction] +#[pyo3(signature = (event_id, args=None, allow_raise=false))] +pub fn dispatch( + py: Python<'_>, + event_id: &str, + args: Option>, + allow_raise: bool, +) -> PyResult<()> { + let callbacks = { + let guard = LISTENERS.read().unwrap(); + let v = match guard.get(event_id) { + None => return Ok(()), + Some(v) if v.is_empty() => return Ok(()), + Some(v) => v, + }; + v.iter().map(|(_, cb)| cb.clone_ref(py)).collect::>() + }; + + let call_args = coerce_to_tuple(py, args); + + for cb in &callbacks { + if let Err(e) = cb.bind(py).call1(&call_args) { + if should_propagate(&e, py, allow_raise) { + return Err(e); + } + } + } + Ok(()) +} + +/// Returns the _MissingEventDict singleton when no listeners, otherwise an EventResultDict +/// mapping name_key -> EventResult. Returned directly — no Python wrapper needed. +#[pyfunction] +#[pyo3(signature = (event_id, args=None))] +pub fn dispatch_with_results( + py: Python<'_>, + event_id: &str, + args: Option>, +) -> PyResult> { + let entries = { + let guard = LISTENERS.read().unwrap(); + let v = match guard.get(event_id) { + Some(v) if !v.is_empty() => v, + _ => return Ok(get_missing_event_dict(py)?.clone_ref(py)), + }; + v.iter() + .map(|(k, cb)| (k.clone_ref(py), cb.clone_ref(py))) + .collect::>() + }; + + let call_args = coerce_to_tuple(py, args); + + let result_dict_py = Py::new(py, EventResultDict)?; + { + let result_dict_bound = result_dict_py.bind(py); + let dict = result_dict_bound.as_any().cast::()?; + for (key, cb) in entries { + match cb.bind(py).call1(&call_args) { + Ok(value) => { + let event_result = Py::new( + py, + EventResult { + response_type: Some(get_rt_ok(py)?.clone_ref(py)), + value: Some(value.unbind()), + exception: None, + is_ok: true, + }, + )?; + dict.set_item(key.bind(py), event_result.bind(py))?; + } + Err(e) => { + if should_propagate(&e, py, false) { + return Err(e); + } + let exc = e.into_value(py).into_any(); + let event_result = Py::new( + py, + EventResult { + response_type: Some(get_rt_exception(py)?.clone_ref(py)), + value: None, + exception: Some(exc), + is_ok: false, + }, + )?; + dict.set_item(key.bind(py), event_result.bind(py))?; + } + } + } + } + + Ok(result_dict_py.into_any()) +} + +pub fn register_event_hub(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_function(wrap_pyfunction!(has_listeners, m)?)?; + m.add_function(wrap_pyfunction!(on, m)?)?; + m.add_function(wrap_pyfunction!(reset, m)?)?; + m.add_function(wrap_pyfunction!(dispatch, m)?)?; + m.add_function(wrap_pyfunction!(dispatch_with_results, m)?)?; + Ok(()) +} diff --git a/src/native/lib.rs b/src/native/lib.rs index 48b208755ad..fc59bc0da55 100644 --- a/src/native/lib.rs +++ b/src/native/lib.rs @@ -6,6 +6,7 @@ mod config; mod data_pipeline; #[cfg(feature = "stats")] mod ddsketch; +mod event_hub; #[cfg(feature = "ffe")] mod ffe; mod library_config; @@ -51,6 +52,7 @@ fn _native(m: &Bound<'_, PyModule>) -> PyResult<()> { shared_runtime::register_shared_runtime(m)?; data_pipeline::register_data_pipeline(m)?; span::register_native_span(m)?; + event_hub::register_event_hub(m)?; rand::register_rand(m)?; m.add_function(wrap_pyfunction!(utils::flatten_key_value, m)?)?; m.add_function(wrap_pyfunction!(utils::is_sequence, m)?)?; diff --git a/tests/internal/test_context_events_api.py b/tests/internal/test_context_events_api.py index b18ac2e5d6b..ceafcab5622 100644 --- a/tests/internal/test_context_events_api.py +++ b/tests/internal/test_context_events_api.py @@ -239,6 +239,35 @@ def on_type_error(*_): with core.context_with_data("my.cool.context"): pass + def test_core_dispatch_args_list_coerced_to_tuple(self): + """dispatch gracefully accepts a list in place of a tuple.""" + received = [] + + def listener(a, b): + received.append((a, b)) + + core.on("my.cool.event", listener) + core.dispatch("my.cool.event", [1, 2]) # type: ignore[arg-type] # ast-grep-ignore: core-dispatch-list-args-multi + assert received == [(1, 2)] + + def test_core_dispatch_args_invalid_type_does_not_raise(self): + """dispatch with a non-iterable args value calls listener with no args and does not crash.""" + called = [] + + def listener(): + called.append(True) + + core.on("my.cool.event", listener) + # An integer is not iterable — coerce_to_tuple falls back to empty tuple. + core.dispatch("my.cool.event", 42) # type: ignore[arg-type] + assert called == [True] + + def test_core_dispatch_with_results_args_list_coerced_to_tuple(self): + """dispatch_with_results gracefully accepts a list in place of a tuple.""" + core.on("my.cool.event", lambda a, b: a + b, "res") + results = core.dispatch_with_results("my.cool.event", [3, 4]) # type: ignore[arg-type] + assert results.res.value == 7 + @with_config_raise_value(raise_value=False) def test_core_dispatch_allow_raise_propagates_exception(self): def on_runtime_error(*_): @@ -487,3 +516,285 @@ async def create_tasks_func(): assert all(s is witness for s in res) loop.close() assert core.find_item("global_counter")["value"] == nb_threads + + +# ── event hub unit tests ────────────────────────────────────────────────────── + + +@pytest.fixture(autouse=False) +def clean_event_hub(): + core.reset_listeners() + yield + core.reset_listeners() + + +def test_event_result_bool(clean_event_hub): + core.on("test.event", lambda: 42, "ok_res") + results = core.dispatch_with_results("test.event", ()) + assert bool(results.ok_res) is True + assert bool(results.nonexistent_key) is False + + +def test_event_result_dict_missing_key_returns_sentinel(clean_event_hub): + core.on("test.event", lambda: 1, "res") + results = core.dispatch_with_results("test.event", ()) + missing = results["no_such_key"] + assert missing.response_type == core.event_hub.ResultType.RESULT_UNDEFINED + assert missing.value is None + assert bool(missing) is False + + +def test_dispatch_with_results_no_listeners_returns_empty_dict(clean_event_hub): + results = core.dispatch_with_results("test.event.noop", ()) + assert len(results) == 0 + missing = results["anything"] + assert missing.response_type == core.event_hub.ResultType.RESULT_UNDEFINED + + +def test_on_name_deduplication(clean_event_hub): + calls = [] + core.on("test.event", lambda: calls.append("first"), "my_listener") + core.on("test.event", lambda: calls.append("second"), "my_listener") + core.dispatch("test.event", ()) + assert calls == ["second"] + + +def test_on_unnamed_same_func_deduplicates(clean_event_hub): + """Registering the same callable twice without a name replaces the first entry.""" + calls = [] + + def listener(): + calls.append(1) + + core.on("test.event", listener) + core.on("test.event", listener) # same object → replace, not append + core.dispatch("test.event", ()) + assert calls == [1] + + +def test_on_unnamed_different_funcs_not_deduplicated(clean_event_hub): + """Two different callables without a name are both registered.""" + calls = [] + core.on("test.event", lambda: calls.append("first")) + core.on("test.event", lambda: calls.append("second")) + core.dispatch("test.event", ()) + assert calls == ["first", "second"] + + +def test_on_bound_method_without_name_deduplicates(clean_event_hub): + """Registering the same bound method twice without a name deduplicates. + Python creates a new object on every attribute access (a.m is not a.m), but + bound methods compare equal via __eq__ (__func__ + __self__), so the second + registration replaces the first and only one listener fires. + """ + + class Component: + def __init__(self): + self.calls = 0 + + def handle(self): + self.calls += 1 + + obj = Component() + core.on("test.event", obj.handle) # bound method object A + core.on("test.event", obj.handle) # bound method object B — same logical method + core.dispatch("test.event", ()) + assert obj.calls == 1 # deduped — called exactly once + + +def test_reset_listeners_removes_bound_method(clean_event_hub): + """reset_listeners must remove a bound method even though Python creates a new + object on every attribute access (a.method is not a.method). Using pointer + identity instead of Python equality silently no-ops the reset, causing + listeners to accumulate across disable/enable cycles. + """ + + class Listener: + def __init__(self): + self.calls = [] + + def handle(self): + self.calls.append(True) + + obj = Listener() + + # Register via one bound-method object, reset via a different one — same + # logical method, different Python objects. + core.on("test.event", obj.handle) + assert core.has_listeners("test.event") + + core.reset_listeners("test.event", obj.handle) # different object, same method + assert not core.has_listeners("test.event") + + core.dispatch("test.event", ()) + assert obj.calls == [], "listener was not removed; reset used pointer identity" + + +def test_dispatch_base_exception_propagates_regardless_of_raise_flag(clean_event_hub): + """BaseException subclasses must propagate even when config._raise is False. + Dispatch uses `except Exception:` semantics — BaseException is never swallowed. + """ + + class BlockingException(BaseException): + pass + + def listener(): + raise BlockingException("blocked!") + + core.on("test.event", listener) + original = config._raise + config._raise = False + try: + with pytest.raises(BlockingException, match="blocked!"): + core.dispatch("test.event", ()) + finally: + config._raise = original + + +def test_dispatch_with_results_base_exception_propagates_regardless_of_raise_flag(clean_event_hub): + """BaseException subclasses must propagate from dispatch_with_results even when + config._raise is False — they are never stored as EventResult.exception. + """ + + class BlockingException(BaseException): + pass + + def listener(): + raise BlockingException("blocked!") + + core.on("test.event", listener, "res") + original = config._raise + config._raise = False + try: + with pytest.raises(BlockingException, match="blocked!"): + core.dispatch_with_results("test.event", ()) + finally: + config._raise = original + + +def test_on_unnamed_reentrant_eq_does_not_deadlock(clean_event_hub): + """__eq__ called during listener registration must not hold the hub lock. + Both read (has_listeners) and write (on) hub operations from inside __eq__ + must succeed — they would deadlock if __eq__ were called under the write lock. + """ + eq_called = [] + done = threading.Event() + + class ReentrantListener: + def __call__(self, *_): + pass + + def __eq__(self, other): + # Both of these would deadlock if the hub write lock were held here. + core.has_listeners("reentrant.eq.event") + core.on("inner.reentrant.event", lambda: None) + eq_called.append(True) + return NotImplemented + + def __hash__(self): + return id(self) + + first = ReentrantListener() + core.on("reentrant.eq.event", first) + + def register(): + core.on("reentrant.eq.event", ReentrantListener()) + done.set() + + t = threading.Thread(target=register, daemon=True) + t.start() + assert done.wait(timeout=2.0), "on() deadlocked while __eq__ reentered the hub" + assert eq_called, "__eq__ must be called (and must not deadlock)" + + +def test_core_dispatch_concurrent_same_event(clean_event_hub): + """N threads all dispatching the same event simultaneously — no lost dispatches.""" + calls = [0] + + def listener(*_): + calls[0] += 1 + + core.on("concurrent.event", listener) + + threads = 10 + iters = 1000 + exceptions = [] + + def worker(): + try: + for _ in range(iters): + core.dispatch("concurrent.event", ()) + except Exception as e: + exceptions.append(e) + + ts = [threading.Thread(target=worker) for _ in range(threads)] + for t in ts: + t.start() + for t in ts: + t.join() + + assert exceptions == [] + assert calls[0] == threads * iters + + +def test_listener_calls_core_api_reentrantly(clean_event_hub): + """A listener may call core.on / core.dispatch / core.reset without deadlocking.""" + nested_called = [] + + def outer_listener(*_): + # Register, dispatch, then remove a nested listener from inside a dispatch. + core.on("inner.event", lambda *_: nested_called.append(True)) + core.dispatch("inner.event", ()) + core.reset_listeners("inner.event") + + core.on("outer.event", outer_listener) + core.dispatch("outer.event", ()) + + assert nested_called == [True] + assert not core.has_listeners("inner.event") + + +def test_listener_dispatches_same_event_does_not_deadlock(clean_event_hub): + """A listener that dispatches the same event it's registered on must not deadlock. + The read lock is dropped before listeners are invoked, so nested acquisition is safe. + Bounded by a depth counter to avoid RecursionError. + """ + depth = [0] + + def listener(*_): + depth[0] += 1 + if depth[0] < 3: + core.dispatch("reentrant.event", ()) + + core.on("reentrant.event", listener) + core.dispatch("reentrant.event", ()) + + assert depth[0] == 3 + + +def test_reset_listeners_simulate_disable_enable_cycle(clean_event_hub): + """Simulate what LLMObs does on every test: disable() → reset_listeners(), + enable() → on(). If reset silently fails, listeners accumulate and fire + multiple times, corrupting any state that depends on exactly one call. + """ + + class Component: + def __init__(self, tag): + self.calls = [] + self.tag = tag + + def on_event(self): + self.calls.append(self.tag) + + c1 = Component("first") + c2 = Component("second") + + # Cycle 1: register c1, then "disable" (reset), then "enable" c2 + core.on("test.event", c1.on_event) + core.reset_listeners("test.event", c1.on_event) + core.on("test.event", c2.on_event) + + core.dispatch("test.event", ()) + + assert c1.calls == [], "stale listener from cycle 1 still firing" + assert c2.calls == ["second"]