From 658a06ca09994f0a4c87dd94923465c90e43fc11 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 5 Jan 2026 17:00:06 +0900 Subject: [PATCH 01/28] feat: Automate cleanup when the last EtcdClient is closed. --- Cargo.lock | 48 +++++++ README.md | 20 ++- etcd_client.pyi | 48 +++++-- python/etcd_client/__init__.py | 2 +- src/client.rs | 16 ++- src/lib.rs | 3 +- src/runtime.rs | 50 ++++++- tests/test_auto_cleanup.py | 244 +++++++++++++++++++++++++++++++++ tests/test_shutdown_stress.py | 16 ++- vendor/pyo3-async-runtimes | 2 +- 10 files changed, 422 insertions(+), 27 deletions(-) create mode 100644 tests/test_auto_cleanup.py diff --git a/Cargo.lock b/Cargo.lock index 6b676c0..0a50419 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,6 +510,15 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -584,6 +593,29 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -733,6 +765,7 @@ version = "0.27.0" dependencies = [ "futures", "once_cell", + "parking_lot", "pin-project-lite", "pyo3", "pyo3-async-runtimes-macros", @@ -807,6 +840,15 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -1155,6 +1197,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/README.md b/README.md index 7041c94..c7693c5 100644 --- a/README.md +++ b/README.md @@ -28,12 +28,12 @@ async def main(): print(bytes(value).decode()) # testvalue ``` -### Cleanup on shutdown +### Automatic runtime cleanup -To prevent segfaults or GIL state violations during Python interpreter shutdown, you should call `cleanup_runtime()` at the end of your main async function before the event loop shuts down: +The tokio runtime is automatically cleaned up when the last client context exits. In most cases, no explicit cleanup is needed: ```python -from etcd_client import EtcdClient, cleanup_runtime +from etcd_client import EtcdClient async def main(): etcd = EtcdClient(['http://127.0.0.1:2379']) @@ -41,13 +41,21 @@ async def main(): await communicator.put('testkey'.encode(), 'testvalue'.encode()) value = await communicator.get('testkey'.encode()) print(bytes(value).decode()) - # Cleanup the tokio runtime before returning - cleanup_runtime() + # Runtime automatically cleaned up when context exits asyncio.run(main()) ``` -This function signals the internal tokio runtime to shut down gracefully, waiting up to 5 seconds for pending tasks to complete. +The library uses reference counting to track active client contexts. When the last context exits, the tokio runtime is gracefully shut down, waiting up to 5 seconds for pending tasks to complete. If you create new clients after this, the runtime is automatically re-initialized. + +For advanced use cases requiring explicit control, `cleanup_runtime()` is still available: + +```python +from etcd_client import cleanup_runtime + +# Force cleanup at a specific point (usually not needed) +cleanup_runtime() +``` `EtcdCommunicator.get_prefix(prefix)` will return a tuple of list containing all key-values with given key prefix. diff --git a/etcd_client.pyi b/etcd_client.pyi index 006d69d..a89f1b3 100644 --- a/etcd_client.pyi +++ b/etcd_client.pyi @@ -360,13 +360,45 @@ class GRPCStatusCode(Enum): """The request does not have valid authentication credentials.""" +def active_context_count() -> int: + """ + Get the number of currently active client contexts. + + Returns the count of client context managers currently in use (inside + `async with` blocks). This is useful for debugging and testing the + automatic cleanup behavior. + + Returns: + The number of active client contexts. Returns 0 when no clients + are in an active context manager. + + Example: + ```python + from etcd_client import Client, active_context_count + + client = Client(["localhost:2379"]) + print(active_context_count()) # 0 + + async with client.connect(): + print(active_context_count()) # 1 + + print(active_context_count()) # 0 + ``` + """ + ... + + def cleanup_runtime() -> None: """ Explicitly cleanup the tokio runtime. + In most cases, the runtime is automatically cleaned up when the last + client context exits. This function is provided for cases where explicit + control is needed, such as when using the client without a context manager. + This function signals the runtime to shutdown and waits for all tracked tasks - to complete. It should be called at the end of your main async function, - before the event loop shuts down. + to complete (up to 5 seconds). After shutdown, the runtime will be lazily + re-initialized if new client operations are performed. Example: ```python @@ -374,16 +406,16 @@ def cleanup_runtime() -> None: async def main(): # Your etcd operations here - client = Client.connect(["localhost:2379"]) - await client.put("key", "value") - # Cleanup before returning - cleanup_runtime() + async with client.connect(): + await client.put("key", "value") + # Runtime is automatically cleaned up when context exits + # Explicit call is usually not needed asyncio.run(main()) ``` Note: - This is useful for ensuring clean shutdown and preventing GIL state - violations during Python interpreter finalization. + This function is idempotent - calling it multiple times or when the + runtime is already shut down is safe and has no effect. """ ... diff --git a/python/etcd_client/__init__.py b/python/etcd_client/__init__.py index 45692ad..8dfd414 100644 --- a/python/etcd_client/__init__.py +++ b/python/etcd_client/__init__.py @@ -1,5 +1,5 @@ from .etcd_client import * # noqa: F403 -from .etcd_client import cleanup_runtime # noqa: F401 +from .etcd_client import active_context_count, cleanup_runtime # noqa: F401 __doc__ = etcd_client.__doc__ # noqa: F405 if hasattr(etcd_client, "__all__"): # noqa: F405 diff --git a/src/client.rs b/src/client.rs index 9aa8302..a8b8a0f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -118,6 +118,9 @@ impl PyClient { #[pyo3(signature = ())] fn __aenter__<'a>(&'a mut self, py: Python<'a>) -> PyResult> { + // Increment context count before any async work + crate::runtime::enter_context(); + let endpoints = self.endpoints.clone(); let connect_options = self.connect_options.clone(); let lock_options = self.lock_options.clone(); @@ -142,7 +145,11 @@ impl PyClient { Ok(PyCommunicator::new(client)) } } - Err(e) => Err(PyClientError(e).into()), + Err(e) => { + // Connection failed - decrement context count to maintain balance + crate::runtime::exit_context(); + Err(PyClientError(e).into()) + } } }) } @@ -163,8 +170,13 @@ impl PyClient { future_into_py(py, async move { if let Some(lock_manager) = lock_manager { - return lock_manager.lock().await.handle_aexit().await; + lock_manager.lock().await.handle_aexit().await?; } + + // Decrement context count after all cleanup is done + // This may trigger runtime shutdown if this was the last active context + crate::runtime::exit_context(); + Ok(()) }) } diff --git a/src/lib.rs b/src/lib.rs index 10c77bd..168bc39 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,8 +72,9 @@ mod etcd_client { )?; m.add("EndpointError", py.get_type::())?; - // Add runtime cleanup function + // Add runtime functions m.add_function(wrap_pyfunction!(crate::runtime::cleanup_runtime, m)?)?; + m.add_function(wrap_pyfunction!(crate::runtime::active_context_count, m)?)?; Ok(()) } diff --git a/src/runtime.rs b/src/runtime.rs index efb306e..66eb0d9 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,8 +1,52 @@ use pyo3::prelude::*; +use std::sync::atomic::{AtomicUsize, Ordering}; -/// Request graceful shutdown of the tokio runtime. +/// Count of active client context managers (those currently inside `async with`). +static ACTIVE_CONTEXTS: AtomicUsize = AtomicUsize::new(0); + +/// Called when a client enters its async context (`__aenter__`). +/// +/// Increments the active context count. If this is the first context after +/// a previous shutdown, the tokio runtime will be lazily re-initialized +/// on the first spawn operation. +pub fn enter_context() { + ACTIVE_CONTEXTS.fetch_add(1, Ordering::SeqCst); +} + +/// Called when a client exits its async context (`__aexit__`). +/// +/// Decrements the active context count. If this was the last active context +/// (count drops from 1 to 0), automatically triggers runtime shutdown. +/// +/// Returns `true` if cleanup was triggered, `false` otherwise. +pub fn exit_context() -> bool { + let prev = ACTIVE_CONTEXTS.fetch_sub(1, Ordering::SeqCst); + + if prev == 1 { + // Was 1, now 0 - last context exited, cleanup runtime + pyo3_async_runtimes::tokio::request_shutdown(5000); + true + } else { + false + } +} + +/// Get the current count of active client contexts. +/// +/// Useful for debugging and testing automatic cleanup behavior. +/// Returns 0 when no clients are in an active context manager. +#[pyfunction] +pub fn active_context_count() -> usize { + ACTIVE_CONTEXTS.load(Ordering::SeqCst) +} + +/// Explicitly request graceful shutdown of the tokio runtime. +/// +/// In most cases, the runtime is automatically cleaned up when the last +/// client context exits. This function is provided for cases where explicit +/// control is needed. /// -/// This should be called at the end of your main async function, before the event loop shuts down: +/// # Example /// /// ```python /// import asyncio @@ -11,7 +55,7 @@ use pyo3::prelude::*; /// async def main(): /// # Your etcd operations here /// ... -/// # Cleanup before returning +/// # Explicit cleanup (usually not needed) /// cleanup_runtime() /// /// asyncio.run(main()) diff --git a/tests/test_auto_cleanup.py b/tests/test_auto_cleanup.py new file mode 100644 index 0000000..bfa13c6 --- /dev/null +++ b/tests/test_auto_cleanup.py @@ -0,0 +1,244 @@ +""" +Tests for automatic tokio runtime cleanup via reference counting. + +These tests validate that: +1. The runtime is automatically cleaned up when the last client context exits +2. The runtime can be re-initialized for sequential client usage +3. Multiple concurrent clients are handled correctly +4. Exception scenarios maintain correct reference counts +""" + +import os +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest + +from etcd_client import Client, active_context_count + + +@pytest.mark.asyncio +async def test_single_client_context_count(etcd_container) -> None: + """Verify context count increments/decrements correctly for single client.""" + etcd_port = etcd_container.get_exposed_port(2379) + client = Client([f"http://127.0.0.1:{etcd_port}"]) + + assert active_context_count() == 0 + + async with client.connect() as comm: + assert active_context_count() == 1 + await comm.put(b"test_key", b"test_value") + + # After context exit, count should be 0 + assert active_context_count() == 0 + + +@pytest.mark.asyncio +async def test_multiple_concurrent_clients(etcd_container) -> None: + """Cleanup only happens when ALL clients exit.""" + etcd_port = etcd_container.get_exposed_port(2379) + client1 = Client([f"http://127.0.0.1:{etcd_port}"]) + client2 = Client([f"http://127.0.0.1:{etcd_port}"]) + + assert active_context_count() == 0 + + async with client1.connect() as c1: + assert active_context_count() == 1 + + async with client2.connect() as c2: + assert active_context_count() == 2 + await c1.put(b"k1", b"v1") + await c2.put(b"k2", b"v2") + + # client2 exited, but client1 still active + assert active_context_count() == 1 + # Should still be able to use client1 + value = await c1.get(b"k1") + assert bytes(value) == b"v1" + + # Both exited + assert active_context_count() == 0 + + +@pytest.mark.asyncio +async def test_nested_contexts_same_client(etcd_container) -> None: + """Each context entry/exit is counted separately, even for same client.""" + etcd_port = etcd_container.get_exposed_port(2379) + client = Client([f"http://127.0.0.1:{etcd_port}"]) + + assert active_context_count() == 0 + + async with client.connect(): + assert active_context_count() == 1 + + # Same client, new connection + async with client.connect() as comm: + assert active_context_count() == 2 + await comm.put(b"nested_key", b"nested_value") + + assert active_context_count() == 1 + + assert active_context_count() == 0 + + +@pytest.mark.asyncio +async def test_exception_during_context(etcd_container) -> None: + """Count is decremented even if exception occurs during context.""" + etcd_port = etcd_container.get_exposed_port(2379) + client = Client([f"http://127.0.0.1:{etcd_port}"]) + + assert active_context_count() == 0 + + with pytest.raises(ValueError, match="test error"): + async with client.connect() as comm: + assert active_context_count() == 1 + await comm.put(b"exc_key", b"exc_value") + raise ValueError("test error") + + # __aexit__ should still have been called + assert active_context_count() == 0 + + +@pytest.mark.asyncio +async def test_context_count_after_operation_failure(etcd_container) -> None: + """Count is properly managed even when operations fail inside context.""" + etcd_port = etcd_container.get_exposed_port(2379) + client = Client([f"http://127.0.0.1:{etcd_port}"]) + + assert active_context_count() == 0 + + # Even if an operation fails, __aexit__ should properly decrement the count + async with client.connect() as comm: + assert active_context_count() == 1 + await comm.put(b"fail_test_key", b"value") + + # Count should be back to 0 after successful context exit + assert active_context_count() == 0 + + +def _make_sequential_test_script(etcd_port: int) -> str: + """Create a test script for sequential client usage with auto re-initialization.""" + return f""" +import asyncio +from etcd_client import Client, active_context_count + +async def main(): + client = Client(["http://127.0.0.1:{etcd_port}"]) + + # First session + async with client.connect() as comm: + await comm.put(b"seq_key", b"value1") + print(f"First session active: {{active_context_count()}}") + + print(f"After first session: {{active_context_count()}}") + # Runtime was cleaned up here, should be re-initialized for second session + + # Second session - runtime should reinitialize automatically + async with client.connect() as comm: + value = await comm.get(b"seq_key") + print(f"Second session active: {{active_context_count()}}") + assert bytes(value) == b"value1", f"Expected 'value1', got {{bytes(value)}}" + + print(f"After second session: {{active_context_count()}}") + print("SUCCESS") + +if __name__ == "__main__": + asyncio.run(main()) +""" + + +@pytest.mark.asyncio +async def test_sequential_clients_reinit(etcd_container) -> None: + """Runtime re-initializes for sequential client usage (subprocess test).""" + etcd_port = etcd_container.get_exposed_port(2379) + script = _make_sequential_test_script(etcd_port) + + project_root = str(Path(__file__).parent.parent.resolve()) + env = os.environ.copy() + env["PYTHONPATH"] = project_root + + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: + f.write(script) + script_path = f.name + + try: + result = subprocess.run( + [sys.executable, "-u", script_path], + capture_output=True, + text=True, + timeout=30, + env=env, + ) + + if result.returncode != 0: + pytest.fail( + f"Sequential client test failed:\n" + f"stdout: {result.stdout}\n" + f"stderr: {result.stderr}" + ) + + assert "SUCCESS" in result.stdout, f"Test did not complete: {result.stdout}" + finally: + os.unlink(script_path) + + +def _make_no_explicit_cleanup_script(etcd_port: int) -> str: + """Create a test script that does NOT call cleanup_runtime() explicitly.""" + return f""" +import asyncio +from etcd_client import Client, active_context_count + +async def main(): + client = Client(["http://127.0.0.1:{etcd_port}"]) + + async with client.connect() as comm: + await comm.put(b"auto_key", b"auto_value") + value = await comm.get(b"auto_key") + assert bytes(value) == b"auto_value" + + # No cleanup_runtime() call - should be automatic + assert active_context_count() == 0 + print("SUCCESS") + +if __name__ == "__main__": + asyncio.run(main()) +""" + + +@pytest.mark.asyncio +async def test_no_explicit_cleanup_needed(etcd_container) -> None: + """Verify that explicit cleanup_runtime() is not needed (subprocess test).""" + etcd_port = etcd_container.get_exposed_port(2379) + script = _make_no_explicit_cleanup_script(etcd_port) + + project_root = str(Path(__file__).parent.parent.resolve()) + env = os.environ.copy() + env["PYTHONPATH"] = project_root + + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: + f.write(script) + script_path = f.name + + try: + # Run multiple times to check for any shutdown issues + for i in range(5): + result = subprocess.run( + [sys.executable, "-u", script_path], + capture_output=True, + text=True, + timeout=10, + env=env, + ) + + if result.returncode != 0: + pytest.fail( + f"Iteration {i+1} failed:\n" + f"stdout: {result.stdout}\n" + f"stderr: {result.stderr}" + ) + + assert "SUCCESS" in result.stdout + finally: + os.unlink(script_path) diff --git a/tests/test_shutdown_stress.py b/tests/test_shutdown_stress.py index 7fda215..d4a3a0b 100644 --- a/tests/test_shutdown_stress.py +++ b/tests/test_shutdown_stress.py @@ -1,9 +1,13 @@ """ Stress tests for tokio runtime cleanup during Python shutdown. -These tests validate that the graceful shutdown mechanism works correctly +These tests validate that the automatic graceful shutdown mechanism works correctly by running multiple subprocess iterations that create and destroy etcd clients. +With automatic cleanup via reference counting, explicit cleanup_runtime() calls +are no longer needed - the runtime is automatically cleaned up when the last +client context exits. + Reference: - BA-1976: https://lablup.atlassian.net/browse/BA-1976 - pyo3-async-runtimes#40: https://github.com/PyO3/pyo3-async-runtimes/issues/40 @@ -19,11 +23,14 @@ def _make_test_script(test_code: str, etcd_port: int) -> str: - """Create a test script for subprocess execution.""" + """Create a test script for subprocess execution. + + Note: No explicit cleanup_runtime() call is needed - automatic cleanup + happens when the last client context exits. + """ return f""" import asyncio -from etcd_client import cleanup_runtime from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair async def main(): @@ -37,8 +44,7 @@ async def main(): {test_code} - # Cleanup BEFORE event loop shutdown - cleanup_runtime() + # No explicit cleanup_runtime() needed - automatic cleanup on context exit if __name__ == "__main__": asyncio.run(main()) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index 1c08aae..572d3d0 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit 1c08aae23428596004e943b366253dbef70e9501 +Subproject commit 572d3d0fe04607cbba9d4f13a5a38d89bc9f8d59 From 616a86fc7d2fffa11bbe0a5b983a9007c13b1ce4 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 5 Jan 2026 17:05:38 +0900 Subject: [PATCH 02/28] ci: Fix submodule fetch for non-default branch commits Add fetch-depth: 0 to checkout action to allow fetching submodule commits that are on non-default branches. --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d409227..029e781 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,6 +27,7 @@ jobs: uses: actions/checkout@v4 with: submodules: recursive + fetch-depth: 0 # Required to fetch submodule commits on non-default branches - name: Set up Rust toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 with: From b2b314b0a571ad4fe990221caaafe9898d75d391 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 5 Jan 2026 17:12:54 +0900 Subject: [PATCH 03/28] ci: Specify submodule branch in .gitmodules Explicitly set the branch for the pyo3-async-runtimes submodule to help Git fetch from the correct branch. --- .gitmodules | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitmodules b/.gitmodules index e9d8e05..db009b9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,4 @@ [submodule "vendor/pyo3-async-runtimes"] path = vendor/pyo3-async-runtimes url = https://github.com/lablup/pyo3-async-runtimes.git + branch = add-explicit-shutdown-api From 55b1ac37436eb438c26700808816f76a2de63dcb Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 5 Jan 2026 17:31:02 +0900 Subject: [PATCH 04/28] fix: Use +gil variant specifier to distinguish Python 3.14 from 3.14t The CI was failing on Python 3.14 with: undefined symbol: PyUnstable_Module_SetGIL This symbol only exists in free-threaded Python (3.14t), not in regular Python 3.14. The issue was that uv may not properly distinguish between 3.14 and 3.14t when both are available. Using the +gil variant specifier (e.g., "3.14+gil") explicitly requests the GIL-enabled Python interpreter, preventing uv from accidentally selecting the free-threaded variant. --- .github/workflows/ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 029e781..f3feb87 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,9 @@ jobs: {os: "ubuntu-22.04", arch: "x86_64", etcd_arch: "amd64"}, {os: "ubuntu-22.04-arm", arch: "aarch64", etcd_arch: "arm64"}, ] - python-version: ["3.11", "3.12", "3.13", "3.14", "3.14t"] + # Use 3.14+gil to explicitly request GIL-enabled Python (not free-threaded) + # This prevents uv from accidentally selecting 3.14t when 3.14 is requested + python-version: ["3.11", "3.12", "3.13", "3.14+gil", "3.14t"] runs-on: ${{ matrix.platform.os }} steps: - name: Checkout the revision From 167139c0ec98ad62e748256f496f8c24bbf8e5a3 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 5 Jan 2026 17:49:05 +0900 Subject: [PATCH 05/28] fix: Revert to using plain 3.14 for test jobs The +gil variant specifier is only for selecting from installed interpreters, not for installation. uv python install 3.14 should install the GIL-enabled version by default. --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f3feb87..6255052 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,9 +20,9 @@ jobs: {os: "ubuntu-22.04", arch: "x86_64", etcd_arch: "amd64"}, {os: "ubuntu-22.04-arm", arch: "aarch64", etcd_arch: "arm64"}, ] - # Use 3.14+gil to explicitly request GIL-enabled Python (not free-threaded) - # This prevents uv from accidentally selecting 3.14t when 3.14 is requested - python-version: ["3.11", "3.12", "3.13", "3.14+gil", "3.14t"] + # Note: 3.14 = GIL-enabled (default), 3.14t = free-threaded + # uv installs GIL-enabled Python by default when using "3.14" + python-version: ["3.11", "3.12", "3.13", "3.14", "3.14t"] runs-on: ${{ matrix.platform.os }} steps: - name: Checkout the revision From 12367652126d3be3ecc77f2246aa508353d7df03 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 5 Jan 2026 18:05:01 +0900 Subject: [PATCH 06/28] ci: Exclude ARM64 + Python 3.14 GIL-enabled test due to uv/PyO3 bug ARM64 + Python 3.14 (GIL-enabled) has a bug where uv's Python build incorrectly triggers PyO3 to generate free-threaded code, causing 'undefined symbol: PyUnstable_Module_SetGIL' errors at runtime. This is specific to: - Platform: ARM64 (aarch64) - Python: 3.14 (GIL-enabled) The following combinations work correctly: - x86_64 + Python 3.14 (GIL-enabled): OK - ARM64 + Python 3.14t (free-threaded): OK Excluding this specific combination until the upstream issue is resolved. --- .github/workflows/ci.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6255052..d96ac96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,14 @@ jobs: # Note: 3.14 = GIL-enabled (default), 3.14t = free-threaded # uv installs GIL-enabled Python by default when using "3.14" python-version: ["3.11", "3.12", "3.13", "3.14", "3.14t"] + exclude: + # ARM64 + Python 3.14 (GIL-enabled) has a bug where uv's Python build + # incorrectly triggers PyO3 to generate free-threaded code, causing + # "undefined symbol: PyUnstable_Module_SetGIL" errors at runtime. + # ARM64 + Python 3.14t (free-threaded) works correctly. + # See: https://github.com/astral-sh/uv/issues/16253 + - platform: {os: "ubuntu-22.04-arm", arch: "aarch64", etcd_arch: "arm64"} + python-version: "3.14" runs-on: ${{ matrix.platform.os }} steps: - name: Checkout the revision From 5fc6a05c65ac5daf92fc2a242651f996e80201f1 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Tue, 6 Jan 2026 18:20:52 +0900 Subject: [PATCH 07/28] chore: Update pyo3-async-runtimes submodule with CI fixes Updates the submodule to include fixes for compilation errors in the upstream PR #71: - Add tokio `sync` feature for Notify support - Restore missing public API functions - Fix stream module function references --- vendor/pyo3-async-runtimes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index 572d3d0..7fb8ca5 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit 572d3d0fe04607cbba9d4f13a5a38d89bc9f8d59 +Subproject commit 7fb8ca5c9c1d36fadea57be130aa0e5fa6035f3e From 290731b8dd3dfdec2fef09f7ff0651e5b786e9ae Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Thu, 8 Jan 2026 16:55:08 +0900 Subject: [PATCH 08/28] chore: Update build/ci dependencies --- pyproject.toml | 14 +++++++------- uv.lock | 48 ++++++++++++++++++++++++------------------------ 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5296624..a7d9fac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,11 +22,11 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "maturin>=1.10.2", - "pytest>=8.4.1,<9", - "pytest-asyncio>=1.1.0,<2", + "maturin>=1.11.2", + "pytest>=9.0.2,<10", + "pytest-asyncio>=1.3.0,<2", "trafaret>=2.1,<3", - "testcontainers>=4.12.0,<5", + "testcontainers>=4.13.3,<5", ] [project.urls] @@ -35,12 +35,12 @@ repository = "https://github.com/lablup/etcd-client-py" [project.optional-dependencies] dev = [ - "ruff>=0.8.5", - "mypy>=1.13.0", + "ruff>=0.14.10", + "mypy>=1.19.1", ] [build-system] -requires = ["maturin>=1.7,<2.0"] +requires = ["maturin>=1.11,<2.0"] build-backend = "maturin" [tool.maturin] diff --git a/uv.lock b/uv.lock index 1a0c9ff..44e1a89 100644 --- a/uv.lock +++ b/uv.lock @@ -151,12 +151,12 @@ dev = [ [package.metadata] requires-dist = [ - { name = "maturin", specifier = ">=1.10.2" }, - { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.13.0" }, - { name = "pytest", specifier = ">=8.4.1,<9" }, - { name = "pytest-asyncio", specifier = ">=1.1.0,<2" }, - { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.5" }, - { name = "testcontainers", specifier = ">=4.12.0,<5" }, + { name = "maturin", specifier = ">=1.11.2" }, + { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.19.1" }, + { name = "pytest", specifier = ">=9.0.2,<10" }, + { name = "pytest-asyncio", specifier = ">=1.3.0,<2" }, + { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.14.10" }, + { name = "testcontainers", specifier = ">=4.13.3,<5" }, { name = "trafaret", specifier = ">=2.1,<3" }, ] provides-extras = ["dev"] @@ -266,26 +266,26 @@ wheels = [ [[package]] name = "maturin" -version = "1.10.2" +version = "1.11.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/02/44/c593afce7d418ae6016b955c978055232359ad28c707a9ac6643fc60512d/maturin-1.10.2.tar.gz", hash = "sha256:259292563da89850bf8f7d37aa4ddba22905214c1e180b1c8f55505dfd8c0e81", size = 217835, upload-time = "2025-11-19T11:53:17.348Z" } +sdist = { url = "https://files.pythonhosted.org/packages/2f/1c/00b48c6b93a5b3795ec96165a60dbafd3c5094aae281ba56812a8cad4fc7/maturin-1.11.2.tar.gz", hash = "sha256:24d2502ee8e6e6a33b3993bc78251da8d982e4da16d6c0ad9b4256135ff8694b", size = 226596, upload-time = "2026-01-05T21:11:45.847Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/15/74/7f7e93019bb71aa072a7cdf951cbe4c9a8d5870dd86c66ec67002153487f/maturin-1.10.2-py3-none-linux_armv6l.whl", hash = "sha256:11c73815f21a755d2129c410e6cb19dbfacbc0155bfc46c706b69930c2eb794b", size = 8763201, upload-time = "2025-11-19T11:52:42.98Z" }, - { url = "https://files.pythonhosted.org/packages/4a/85/1d1b64dbb6518ee633bfde8787e251ae59428818fea7a6bdacb8008a09bd/maturin-1.10.2-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:7fbd997c5347649ee7987bd05a92bd5b8b07efa4ac3f8bcbf6196e07eb573d89", size = 17072583, upload-time = "2025-11-19T11:52:45.636Z" }, - { url = "https://files.pythonhosted.org/packages/7c/45/2418f0d6e1cbdf890205d1dc73ebea6778bb9ce80f92e866576c701ded72/maturin-1.10.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e3ce9b2ad4fb9c341f450a6d32dc3edb409a2d582a81bc46ba55f6e3b6196b22", size = 8827021, upload-time = "2025-11-19T11:52:48.143Z" }, - { url = "https://files.pythonhosted.org/packages/7f/83/14c96ddc93b38745d8c3b85126f7d78a94f809a49dc9644bb22b0dc7b78c/maturin-1.10.2-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:f0d1b7b5f73c8d30a7e71cd2a2189a7f0126a3a3cd8b3d6843e7e1d4db50f759", size = 8751780, upload-time = "2025-11-19T11:52:51.613Z" }, - { url = "https://files.pythonhosted.org/packages/46/8d/753148c0d0472acd31a297f6d11c3263cd2668d38278ed29d523625f7290/maturin-1.10.2-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:efcd496a3202ffe0d0489df1f83d08b91399782fb2dd545d5a1e7bf6fd81af39", size = 9241884, upload-time = "2025-11-19T11:52:53.946Z" }, - { url = "https://files.pythonhosted.org/packages/b9/f9/f5ca9fe8cad70cac6f3b6008598cc708f8a74dd619baced99784a6253f23/maturin-1.10.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:a41ec70d99e27c05377be90f8e3c3def2a7bae4d0d9d5ea874aaf2d1da625d5c", size = 8671736, upload-time = "2025-11-19T11:52:57.133Z" }, - { url = "https://files.pythonhosted.org/packages/0a/76/f59cbcfcabef0259c3971f8b5754c85276a272028d8363386b03ec4e9947/maturin-1.10.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:07a82864352feeaf2167247c8206937ef6c6ae9533025d416b7004ade0ea601d", size = 8633475, upload-time = "2025-11-19T11:53:00.389Z" }, - { url = "https://files.pythonhosted.org/packages/53/40/96cd959ad1dda6c12301860a74afece200a3209d84b393beedd5d7d915c0/maturin-1.10.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:04df81ee295dcda37828bd025a4ac688ea856e3946e4cb300a8f44a448de0069", size = 11177118, upload-time = "2025-11-19T11:53:03.014Z" }, - { url = "https://files.pythonhosted.org/packages/e5/b6/144f180f36314be183f5237011528f0e39fe5fd2e74e65c3b44a5795971e/maturin-1.10.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:96e1d391e4c1fa87edf2a37e4d53d5f2e5f39dd880b9d8306ac9f8eb212d23f8", size = 9320218, upload-time = "2025-11-19T11:53:05.39Z" }, - { url = "https://files.pythonhosted.org/packages/eb/2d/2c483c1b3118e2e10fd8219d5291843f5f7c12284113251bf506144a3ac1/maturin-1.10.2-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:a217aa7c42aa332fb8e8377eb07314e1f02cf0fe036f614aca4575121952addd", size = 8985266, upload-time = "2025-11-19T11:53:07.618Z" }, - { url = "https://files.pythonhosted.org/packages/1d/98/1d0222521e112cd058b56e8d96c72cf9615f799e3b557adb4b16004f42aa/maturin-1.10.2-py3-none-win32.whl", hash = "sha256:da031771d9fb6ddb1d373638ec2556feee29e4507365cd5749a2d354bcadd818", size = 7667897, upload-time = "2025-11-19T11:53:10.14Z" }, - { url = "https://files.pythonhosted.org/packages/a0/ec/c6c973b1def0d04533620b439d5d7aebb257657ba66710885394514c8045/maturin-1.10.2-py3-none-win_amd64.whl", hash = "sha256:da777766fd584440dc9fecd30059a94f85e4983f58b09e438ae38ee4b494024c", size = 8908416, upload-time = "2025-11-19T11:53:12.862Z" }, - { url = "https://files.pythonhosted.org/packages/1b/01/7da60c9f7d5dc92dfa5e8888239fd0fb2613ee19e44e6db5c2ed5595fab3/maturin-1.10.2-py3-none-win_arm64.whl", hash = "sha256:a4c29a770ea2c76082e0afc6d4efd8ee94405588bfae00d10828f72e206c739b", size = 7506680, upload-time = "2025-11-19T11:53:15.403Z" }, + { url = "https://files.pythonhosted.org/packages/79/c4/85478aeffd361c14b283464a8f240fa6f8f93a5413458d1aeedffdd39a9b/maturin-1.11.2-py3-none-linux_armv6l.whl", hash = "sha256:92cad385d383d9effef2b532085098c6d555f9993566cdead14e0a70fb900aa6", size = 8852599, upload-time = "2026-01-05T21:11:44.465Z" }, + { url = "https://files.pythonhosted.org/packages/59/e8/0f9542984b740120c07ef6b1f1eff7ce4ee45e32c2ad69db87b9729b4007/maturin-1.11.2-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:7dff6b23d7ae70608d0ad7651e3d5acb8f8c982d2a3bccd8b3fbbc2b3696d84b", size = 17237591, upload-time = "2026-01-05T21:11:52.697Z" }, + { url = "https://files.pythonhosted.org/packages/13/5c/2271c7f952dc4ac8bbf86d2f45f2d005d843d2f67d1f0c3675027adc5967/maturin-1.11.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:170ade0cb2816f1e0d050c7eb94906b90d59906d31428286ad5683ee3379c403", size = 8885232, upload-time = "2026-01-05T21:11:30.994Z" }, + { url = "https://files.pythonhosted.org/packages/26/eb/ae3bd7ef6f74d19b8fc00dcdf301abd0344d9840096b8094b2a8c68186d7/maturin-1.11.2-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:38b593e9dc55ddc68baac5b356be7ceeabf5bde89ae1f77269b817cfb4ad2f7c", size = 8870567, upload-time = "2026-01-05T21:11:36.773Z" }, + { url = "https://files.pythonhosted.org/packages/2a/5c/d0397393b1096e01bc553ff253d4f5847ec75b48313ed2239a0677cc6f27/maturin-1.11.2-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:e4adfd055220a1d4d9d13fc63ef78750ee3769e913d9453fe18cd2ce14bb72fb", size = 9294907, upload-time = "2026-01-05T21:11:50.475Z" }, + { url = "https://files.pythonhosted.org/packages/07/f7/f2aac58e4a09ce0ea6cbfae3831a7eaf04bd830464f677fba5519fa277d1/maturin-1.11.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:3a381b231b453e5cf2ca9cd1666061e4f712ac34bd2e339bc90e692f7d9f6a20", size = 8834287, upload-time = "2026-01-05T21:11:46.794Z" }, + { url = "https://files.pythonhosted.org/packages/cc/5a/029db0c105debfe334c3b76d7d7575ea658841726372ea531c4eb77d21cc/maturin-1.11.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:60fa20b43bdd0438fac9dea9221f19a278a4a6d9b52c67d53184fa905959cf54", size = 8728055, upload-time = "2026-01-05T21:11:33.044Z" }, + { url = "https://files.pythonhosted.org/packages/58/f0/d48c944151c9373901705f3f11c004233c1a72199408e8585ad66bc3b063/maturin-1.11.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:4dd8c0b0c0f4b5d584bcdba62b0af339366933fabe56cfa187f193f1860c84f3", size = 11385341, upload-time = "2026-01-05T21:11:35.058Z" }, + { url = "https://files.pythonhosted.org/packages/38/d7/7548a8d561a215531e2859e77a421b7fc6d542c6dc8087175eba54f24367/maturin-1.11.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:fde6d45b6efec43e6bf06b143ec06808599784be3f44f5e2914e33b8d3639d66", size = 9429750, upload-time = "2026-01-05T21:11:38.918Z" }, + { url = "https://files.pythonhosted.org/packages/b8/bf/d581d8a938fc1f0e414f5454e745524a9cea7786990fa7a1222d2563f32b/maturin-1.11.2-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:97c454f764232c0488dddcabd61120bbd413811e8f8ef4952ca46e5aade71134", size = 9099156, upload-time = "2026-01-05T21:11:48.407Z" }, + { url = "https://files.pythonhosted.org/packages/43/43/8f29d880034256144b8f72706c95e2d8c5d8b1045cd4a1783e52f1e02dd2/maturin-1.11.2-py3-none-win32.whl", hash = "sha256:bfcaaef0f72a0153a8965f45a9f1b3682658386fdd53a5ca4709ffb9a4f9a652", size = 7753292, upload-time = "2026-01-05T21:11:54.797Z" }, + { url = "https://files.pythonhosted.org/packages/0a/96/8931519e4aca020d2d8fb39e47b5be1a2446073ea3708f3efb046c9790c3/maturin-1.11.2-py3-none-win_amd64.whl", hash = "sha256:715d8fb30593f7aa2f02d7287f34568698596a64c08ba65e8dcd35475fb5456b", size = 9030254, upload-time = "2026-01-05T21:11:40.951Z" }, + { url = "https://files.pythonhosted.org/packages/b8/76/baf33fcc61d4146a9a9ae7eedb414a90aa0923534112d700487f11d6e70a/maturin-1.11.2-py3-none-win_arm64.whl", hash = "sha256:0a25365428cdb9170c515a4b81f1dad220bd3c8ee333f612f557f91ff60c2b06", size = 7633080, upload-time = "2026-01-05T21:11:42.719Z" }, ] [[package]] @@ -381,7 +381,7 @@ wheels = [ [[package]] name = "pytest" -version = "8.4.2" +version = "9.0.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "colorama", marker = "sys_platform == 'win32'" }, @@ -392,9 +392,9 @@ dependencies = [ { name = "pygments" }, { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/a3/5c/00a0e072241553e1a7496d638deababa67c5058571567b92a7eaa258397c/pytest-8.4.2.tar.gz", hash = "sha256:86c0d0b93306b961d58d62a4db4879f27fe25513d4b969df351abdddb3c30e01", size = 1519618, upload-time = "2025-09-04T14:34:22.711Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" }, + { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, ] [[package]] From b0c57e7b0613a4f9d0e40735cf4b67b1203c1c32 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Thu, 8 Jan 2026 23:50:02 +0900 Subject: [PATCH 09/28] feat: Update pyo3-async-runtime and dependencies --- Cargo.lock | 38 ++------------------------------------ tests/conftest.py | 13 +++++++------ vendor/pyo3-async-runtimes | 2 +- 3 files changed, 10 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a50419..14c2c77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,21 +208,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -239,23 +224,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" - [[package]] name = "futures-macro" version = "0.3.31" @@ -285,13 +253,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures-channel", "futures-core", - "futures-io", "futures-macro", "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", "slab", @@ -763,7 +728,8 @@ dependencies = [ name = "pyo3-async-runtimes" version = "0.27.0" dependencies = [ - "futures", + "futures-channel", + "futures-util", "once_cell", "parking_lot", "pin-project-lite", diff --git a/tests/conftest.py b/tests/conftest.py index 75050ac..d0e1c27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,6 @@ import pytest from testcontainers.core.container import DockerContainer -from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.core.wait_strategies import LogMessageWaitStrategy from .harness import AsyncEtcd, ConfigScopes, HostPortPair @@ -24,11 +24,12 @@ @pytest.fixture(scope="session") def etcd_container(): - with DockerContainer( - f"gcr.io/etcd-development/etcd:{ETCD_VER}", - command=_etcd_command, - ).with_exposed_ports(2379) as container: - wait_for_logs(container, "ready to serve client requests") + container = ( + DockerContainer(f"gcr.io/etcd-development/etcd:{ETCD_VER}", command=_etcd_command) + .with_exposed_ports(2379) + .waiting_for(LogMessageWaitStrategy("ready to serve client requests")) + ) + with container: yield container diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index 7fb8ca5..2c2070b 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit 7fb8ca5c9c1d36fadea57be130aa0e5fa6035f3e +Subproject commit 2c2070ba53cbee41ddfc47fc22a67647af63ede3 From 87779872d36582db2f47e9c05d683a435cafe86d Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 00:00:19 +0900 Subject: [PATCH 10/28] ci: Restore 3.14 aarch64 ci matrix --- .github/workflows/ci.yml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d96ac96..029e781 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,17 +20,7 @@ jobs: {os: "ubuntu-22.04", arch: "x86_64", etcd_arch: "amd64"}, {os: "ubuntu-22.04-arm", arch: "aarch64", etcd_arch: "arm64"}, ] - # Note: 3.14 = GIL-enabled (default), 3.14t = free-threaded - # uv installs GIL-enabled Python by default when using "3.14" python-version: ["3.11", "3.12", "3.13", "3.14", "3.14t"] - exclude: - # ARM64 + Python 3.14 (GIL-enabled) has a bug where uv's Python build - # incorrectly triggers PyO3 to generate free-threaded code, causing - # "undefined symbol: PyUnstable_Module_SetGIL" errors at runtime. - # ARM64 + Python 3.14t (free-threaded) works correctly. - # See: https://github.com/astral-sh/uv/issues/16253 - - platform: {os: "ubuntu-22.04-arm", arch: "aarch64", etcd_arch: "arm64"} - python-version: "3.14" runs-on: ${{ matrix.platform.os }} steps: - name: Checkout the revision From a7e520e02b1f9b0dbb9e6018fd78f247f7325d44 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 00:05:53 +0900 Subject: [PATCH 11/28] chore: Update pyo3-async-runtimes submodule with deprecation fix Fixes CI build failure caused by deprecated function warnings treated as errors with RUSTFLAGS="-D warnings". --- vendor/pyo3-async-runtimes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index 2c2070b..d14eb29 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit 2c2070ba53cbee41ddfc47fc22a67647af63ede3 +Subproject commit d14eb29971fa41c33c40d67fd11b93b933726584 From efa445366548efdc5583a503183712074c4ebc8f Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 00:29:15 +0900 Subject: [PATCH 12/28] fix: Update pyo3-async-runtimes with deadlock fix Fixes stress test performance regression from 8+ minutes to ~7 seconds. The issue was that request_shutdown() was blocking on thread.join() when called from within a tokio task (in __aexit__), causing a 5-second timeout per subprocess iteration. --- vendor/pyo3-async-runtimes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index d14eb29..a7058d7 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit d14eb29971fa41c33c40d67fd11b93b933726584 +Subproject commit a7058d79d36adcf374e222af03ecffc204e61256 From f35221d05eeb26eb6363d76e8ac78d842b44ec98 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 00:42:07 +0900 Subject: [PATCH 13/28] ci: Debug python build... --- .github/workflows/ci.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 029e781..629567e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,9 +43,19 @@ jobs: with: enable-cache: true python-version: ${{ matrix.python-version }} + - name: Check python build + run: | + uv run --no-project python -c 'import sys; print(sys.prefix); print(sys.version_info)' + uv run --no-project python -c 'import sysconfig; print("Py_GIL_DISABLED=", sysconfig.get_config_var("Py_GIL_DISABLED"))' + uv run --no-project python -c 'import sys; print(f"{sys.abiflags=}")' + which python + python --version + which -a python3.14 || true + which -a python3.14t || true - name: Install dependencies and build the package run: | uv sync --locked --all-extras --no-install-project + PYO3_PRINT_CONFIG=1 uv run maturin develop 2>&1 || true uv run maturin develop - name: Test run: | From 93d9d7b31bcf751f1a33e4cd6618f8f7ccb77f12 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 01:56:38 +0900 Subject: [PATCH 14/28] ci: Add artifact upload for debugging aarch64 Python 3.14 build --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 629567e..7134933 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,6 +57,12 @@ jobs: uv sync --locked --all-extras --no-install-project PYO3_PRINT_CONFIG=1 uv run maturin develop 2>&1 || true uv run maturin develop + - name: Upload built extension module (for debugging) + if: matrix.platform.arch == 'aarch64' && matrix.python-version == '3.14' + uses: actions/upload-artifact@v4 + with: + name: debug-extension-${{ matrix.platform.arch }}-${{ matrix.python-version }} + path: python/etcd_client/*.so - name: Test run: | uv run pytest From a2214cb801e58daf6c35568901af124f03049860 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 02:13:02 +0900 Subject: [PATCH 15/28] ci: Add Python version to Rust cache key to prevent cross-contamination --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7134933..d3fd8db 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,9 @@ jobs: with: toolchain: stable override: true + # Include Python version in cache key to prevent cross-contamination + # between GIL-enabled (3.14) and free-threaded (3.14t) builds + cache-key: py${{ matrix.python-version }} - name: Install protobuf compiler uses: arduino/setup-protoc@v3 with: From 5f62b18fefc96428cc2edaf063586f82f81b5f0c Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 11:23:45 +0900 Subject: [PATCH 16/28] fix: Use request_shutdown_background in exit_context to avoid race conditions The exit_context() function is called from within a future_into_py block, which means it runs inside a tokio task. Using the blocking request_shutdown() could cause race conditions where in-flight tasks try to access a runtime that is being torn down. Added new request_shutdown_background() to pyo3-async-runtimes that signals shutdown without blocking or immediately clearing the runtime slot, allowing the current task to complete gracefully before the runtime shuts down. --- src/runtime.rs | 8 +++++++- vendor/pyo3-async-runtimes | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/runtime.rs b/src/runtime.rs index 66eb0d9..efb9470 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -18,13 +18,19 @@ pub fn enter_context() { /// Decrements the active context count. If this was the last active context /// (count drops from 1 to 0), automatically triggers runtime shutdown. /// +/// Note: Uses `request_shutdown_background` instead of `request_shutdown` +/// because this function is called from within an async context (inside +/// `future_into_py`). The background version signals shutdown without +/// blocking, allowing the current async task to complete gracefully. +/// /// Returns `true` if cleanup was triggered, `false` otherwise. pub fn exit_context() -> bool { let prev = ACTIVE_CONTEXTS.fetch_sub(1, Ordering::SeqCst); if prev == 1 { // Was 1, now 0 - last context exited, cleanup runtime - pyo3_async_runtimes::tokio::request_shutdown(5000); + // Use background shutdown since we're inside an async context + pyo3_async_runtimes::tokio::request_shutdown_background(5000); true } else { false diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index a7058d7..be6205d 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit a7058d79d36adcf374e222af03ecffc204e61256 +Subproject commit be6205d3b95c21bfd251bd483d044d45624659f7 From 46ec1f925d5633bdfd708ec14e5f6965d5cf4d2c Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 11:24:31 +0900 Subject: [PATCH 17/28] ci: Trigger CI after pushing submodule changes From 338510cf145f294790f6a636cd9143bf1a6ea14a Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 11:43:47 +0900 Subject: [PATCH 18/28] chore: Update pyo3-async-runtimes with proper background shutdown fix The previous request_shutdown_background() implementation left the runtime wrapper in storage, causing potential deadlocks. The new implementation: 1. Atomically clears the wrapper from storage (new ops get fresh runtime) 2. Spawns a background thread to properly join the runtime with timeout 3. Avoids blocking the calling async task --- vendor/pyo3-async-runtimes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index be6205d..a1aa115 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit be6205d3b95c21bfd251bd483d044d45624659f7 +Subproject commit a1aa115314397cb9efc77b0e8aa401cd749505b4 From a793afd09f741333a61118b6b5c599b87f3a7758 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 11:48:17 +0900 Subject: [PATCH 19/28] fix: Update pyo3-async-runtimes - avoid background thread in async shutdown The previous approach of spawning a detached join thread caused SIGSEGV because it raced with Python's interpreter shutdown. Now we just signal shutdown and let the runtime thread complete independently. --- vendor/pyo3-async-runtimes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index a1aa115..d9b004a 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit a1aa115314397cb9efc77b0e8aa401cd749505b4 +Subproject commit d9b004a92ac835e74eb6da844eb2bdceb5c9a883 From e54260bb5d0b4243d9bdd3985c97b5ea9dcc849c Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 11:54:55 +0900 Subject: [PATCH 20/28] fix: Register atexit handler for tokio runtime cleanup Added register_atexit_cleanup(py) call during module init to ensure tokio runtime threads are properly joined before Python finalizes. This prevents SIGSEGV crashes on Python 3.11 and 3.12 when tokio threads run during interpreter shutdown. --- src/lib.rs | 6 ++++++ vendor/pyo3-async-runtimes | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 168bc39..4069e9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,12 @@ mod etcd_client { #[pymodule_init] fn init(m: &Bound<'_, PyModule>) -> PyResult<()> { let py = m.py(); + + // Register atexit handler for tokio runtime cleanup. + // This ensures runtime threads are properly joined before Python finalizes, + // preventing SIGSEGV when tokio threads run during interpreter shutdown. + pyo3_async_runtimes::tokio::register_atexit_cleanup(py)?; + m.add("ClientError", py.get_type::())?; m.add("GRPCStatusError", py.get_type::())?; m.add("InvalidArgsError", py.get_type::())?; diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index d9b004a..a1f372e 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit d9b004a92ac835e74eb6da844eb2bdceb5c9a883 +Subproject commit a1f372e965ddc1191e3b31bed3992bd27e9e8034 From 2f5c0a9c4792d0f6a9a3eb8a57bedab3c146df99 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 12:22:15 +0900 Subject: [PATCH 21/28] fix: Complete shutdown within async context using asyncio.to_thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Redesigned the shutdown mechanism to ensure the tokio runtime thread is fully terminated before Python's event loop closes: 1. __aexit__ now returns a Python coroutine that: - Awaits the Rust async cleanup (tokio task) - If shutdown was triggered, awaits asyncio.to_thread() to block-join the runtime thread with GIL released 2. Added _join_pending_shutdown() Python function that: - Takes the pending thread handle from storage - Joins it with GIL released - Is called via asyncio.to_thread() from __aexit__ 3. Added comprehensive stress tests for: - Multi-async-task scenario (5 concurrent tasks per process) - Multi-threaded scenario (4 threads with separate event loops) - Mixed concurrency (3 threads × 3 async tasks each) This approach ensures: - No atexit dependency - everything completes in async context - Automatic cleanup when last client exits (ref-counting) - No deadlocks - blocking happens outside tokio via to_thread - Thread-safe and async-task-safe Fixes the SIGSEGV that occurred when tokio threads outlived Python. --- src/client.rs | 46 ++++++++-- src/lib.rs | 6 +- src/runtime.rs | 13 +++ tests/test_shutdown_stress.py | 160 ++++++++++++++++++++++++++++++++++ vendor/pyo3-async-runtimes | 2 +- 5 files changed, 215 insertions(+), 12 deletions(-) diff --git a/src/client.rs b/src/client.rs index a8b8a0f..2b32c8f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ use etcd_client::{Client as EtcdClient, ConnectOptions}; use pyo3::prelude::*; -use pyo3::types::PyTuple; +use pyo3::types::{PyDict, PyTuple}; use pyo3_async_runtimes::tokio::future_into_py; use std::sync::Arc; use std::time::Duration; @@ -168,16 +168,50 @@ impl PyClient { None }; - future_into_py(py, async move { + // Create the tokio async cleanup coroutine + // Returns True if this was the last client and shutdown was triggered + let inner_cleanup = future_into_py(py, async move { if let Some(lock_manager) = lock_manager { lock_manager.lock().await.handle_aexit().await?; } // Decrement context count after all cleanup is done // This may trigger runtime shutdown if this was the last active context - crate::runtime::exit_context(); - - Ok(()) - }) + let triggered_shutdown = crate::runtime::exit_context(); + + Ok(triggered_shutdown) + })?; + + // Get the join function for use in the wrapper + let etcd_client = py.import("etcd_client")?; + let join_fn = etcd_client.getattr("_join_pending_shutdown")?; + + // Create a Python wrapper coroutine that: + // 1. Awaits the inner cleanup + // 2. If shutdown was triggered, awaits the blocking join via to_thread + let asyncio = py.import("asyncio")?; + let to_thread = asyncio.getattr("to_thread")?; + + // Build the wrapper coroutine using Python code + let globals = PyDict::new(py); + globals.set_item("inner_cleanup", inner_cleanup)?; + globals.set_item("join_fn", join_fn)?; + globals.set_item("to_thread", to_thread)?; + + py.run( + c" +async def _aexit_wrapper(): + triggered_shutdown = await inner_cleanup + if triggered_shutdown: + await to_thread(join_fn) + return None +_result = _aexit_wrapper() +", + Some(&globals), + None, + )?; + + let result = globals.get_item("_result")?.unwrap(); + Ok(result) } } diff --git a/src/lib.rs b/src/lib.rs index 4069e9b..0459aa4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,11 +57,6 @@ mod etcd_client { fn init(m: &Bound<'_, PyModule>) -> PyResult<()> { let py = m.py(); - // Register atexit handler for tokio runtime cleanup. - // This ensures runtime threads are properly joined before Python finalizes, - // preventing SIGSEGV when tokio threads run during interpreter shutdown. - pyo3_async_runtimes::tokio::register_atexit_cleanup(py)?; - m.add("ClientError", py.get_type::())?; m.add("GRPCStatusError", py.get_type::())?; m.add("InvalidArgsError", py.get_type::())?; @@ -81,6 +76,7 @@ mod etcd_client { // Add runtime functions m.add_function(wrap_pyfunction!(crate::runtime::cleanup_runtime, m)?)?; m.add_function(wrap_pyfunction!(crate::runtime::active_context_count, m)?)?; + m.add_function(wrap_pyfunction!(crate::runtime::_join_pending_shutdown, m)?)?; Ok(()) } diff --git a/src/runtime.rs b/src/runtime.rs index efb9470..d54a172 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -73,3 +73,16 @@ pub fn active_context_count() -> usize { pub fn cleanup_runtime() { pyo3_async_runtimes::tokio::request_shutdown(5000); } + +/// Internal function to join the pending runtime shutdown thread. +/// +/// This is called from Python via `asyncio.to_thread()` to block until +/// the runtime thread has fully terminated. The GIL is released during +/// the blocking wait. +/// +/// Users should not call this directly - it's used internally by the +/// client's `__aexit__` implementation. +#[pyfunction] +pub fn _join_pending_shutdown(py: Python<'_>) -> bool { + pyo3_async_runtimes::tokio::join_pending_shutdown(py) +} diff --git a/tests/test_shutdown_stress.py b/tests/test_shutdown_stress.py index d4a3a0b..745d941 100644 --- a/tests/test_shutdown_stress.py +++ b/tests/test_shutdown_stress.py @@ -157,3 +157,163 @@ async def test_shutdown_sanity_check(etcd_container) -> None: script = _make_test_script(test_code, etcd_port) _run_subprocess_test(script, iterations=5) + + +@pytest.mark.asyncio +async def test_shutdown_multi_async_tasks(etcd_container) -> None: + """Test shutdown with multiple concurrent async tasks, each with its own client. + + This tests the reference counting mechanism with multiple clients sharing + one event loop - shutdown should only happen when the last client exits. + """ + etcd_port = etcd_container.get_exposed_port(2379) + + script = f""" +import asyncio +from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair + +async def worker(worker_id: int): + etcd = AsyncEtcd( + addr=HostPortPair(host="127.0.0.1", port={etcd_port}), + namespace=f"test_multi_task_{{worker_id}}", + scope_prefix_map={{ + ConfigScopes.GLOBAL: "global", + }}, + ) + async with etcd: + for i in range(5): + await etcd.put(f"key_{{worker_id}}_{{i}}", f"value_{{i}}") + await asyncio.sleep(0.01) + +async def main(): + # Launch 5 concurrent tasks, each with its own client + tasks = [asyncio.create_task(worker(i)) for i in range(5)] + await asyncio.gather(*tasks) + +if __name__ == "__main__": + asyncio.run(main()) +""" + + _run_subprocess_test(script, iterations=20) + + +@pytest.mark.asyncio +async def test_shutdown_multi_threaded(etcd_container) -> None: + """Test shutdown with multiple threads, each running its own event loop. + + This tests thread safety of the reference counting mechanism - each thread + has its own asyncio event loop and creates/destroys clients independently. + """ + etcd_port = etcd_container.get_exposed_port(2379) + + script = f""" +import asyncio +import threading +from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair + +def thread_worker(thread_id: int, results: list, errors: list): + try: + async def async_work(): + etcd = AsyncEtcd( + addr=HostPortPair(host="127.0.0.1", port={etcd_port}), + namespace=f"test_multi_thread_{{thread_id}}", + scope_prefix_map={{ + ConfigScopes.GLOBAL: "global", + }}, + ) + async with etcd: + for i in range(3): + await etcd.put(f"key_{{thread_id}}_{{i}}", f"value_{{i}}") + + asyncio.run(async_work()) + results.append(thread_id) + except Exception as e: + errors.append((thread_id, str(e))) + +def main(): + results = [] + errors = [] + threads = [] + + # Launch 4 threads, each with its own event loop and client + for i in range(4): + t = threading.Thread(target=thread_worker, args=(i, results, errors)) + threads.append(t) + t.start() + + for t in threads: + t.join(timeout=10) + + if errors: + raise RuntimeError(f"Thread errors: {{errors}}") + if len(results) != 4: + raise RuntimeError(f"Expected 4 completed threads, got {{len(results)}}") + +if __name__ == "__main__": + main() +""" + + _run_subprocess_test(script, iterations=10) + + +@pytest.mark.asyncio +async def test_shutdown_mixed_concurrency(etcd_container) -> None: + """Test shutdown with mixed concurrency: multiple threads with multiple async tasks each. + + This is the most complex scenario - multiple threads each running their own + event loop with multiple concurrent async tasks. + """ + etcd_port = etcd_container.get_exposed_port(2379) + + script = f""" +import asyncio +import threading +from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair + +def thread_worker(thread_id: int, results: list, errors: list): + try: + async def async_task(task_id: int): + etcd = AsyncEtcd( + addr=HostPortPair(host="127.0.0.1", port={etcd_port}), + namespace=f"test_mixed_{{thread_id}}_{{task_id}}", + scope_prefix_map={{ + ConfigScopes.GLOBAL: "global", + }}, + ) + async with etcd: + await etcd.put(f"key", f"value_{{thread_id}}_{{task_id}}") + + async def run_tasks(): + # Each thread runs 3 concurrent async tasks + tasks = [asyncio.create_task(async_task(i)) for i in range(3)] + await asyncio.gather(*tasks) + + asyncio.run(run_tasks()) + results.append(thread_id) + except Exception as e: + errors.append((thread_id, str(e))) + +def main(): + results = [] + errors = [] + threads = [] + + # Launch 3 threads + for i in range(3): + t = threading.Thread(target=thread_worker, args=(i, results, errors)) + threads.append(t) + t.start() + + for t in threads: + t.join(timeout=15) + + if errors: + raise RuntimeError(f"Thread errors: {{errors}}") + if len(results) != 3: + raise RuntimeError(f"Expected 3 completed threads, got {{len(results)}}") + +if __name__ == "__main__": + main() +""" + + _run_subprocess_test(script, iterations=10) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index a1f372e..1b266e5 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit a1f372e965ddc1191e3b31bed3992bd27e9e8034 +Subproject commit 1b266e5f06fbed5c11f2ce5fa55e71820cfcab36 From df10cbb6ec7797c29e7b299d93dc72132adfd8aa Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 12:27:32 +0900 Subject: [PATCH 22/28] test: Increase subprocess timeout for complex concurrency tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The multi-threaded and mixed concurrency tests can take longer on CI due to thread setup overhead and variable machine performance. - Add configurable timeout parameter to _run_subprocess_test - Use 20s timeout for multi-threaded test (4 threads) - Use 30s timeout for mixed concurrency test (3 threads × 3 tasks) --- tests/test_shutdown_stress.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/test_shutdown_stress.py b/tests/test_shutdown_stress.py index 745d941..954ef29 100644 --- a/tests/test_shutdown_stress.py +++ b/tests/test_shutdown_stress.py @@ -51,8 +51,16 @@ async def main(): """ -def _run_subprocess_test(script_content: str, iterations: int = 10) -> None: - """Run a test script in subprocess multiple times to detect shutdown issues.""" +def _run_subprocess_test( + script_content: str, iterations: int = 10, timeout: int = 10 +) -> None: + """Run a test script in subprocess multiple times to detect shutdown issues. + + Args: + script_content: The Python script to run. + iterations: Number of times to run the script. + timeout: Subprocess timeout in seconds per iteration (default 10). + """ project_root = str(Path(__file__).parent.parent.resolve()) env = os.environ.copy() env["PYTHONPATH"] = project_root @@ -68,7 +76,7 @@ def _run_subprocess_test(script_content: str, iterations: int = 10) -> None: [sys.executable, "-u", script_path], capture_output=True, text=True, - timeout=10, + timeout=timeout, env=env, ) @@ -253,7 +261,8 @@ def main(): main() """ - _run_subprocess_test(script, iterations=10) + # Use longer timeout (20s) for multi-threaded test due to thread setup overhead + _run_subprocess_test(script, iterations=10, timeout=20) @pytest.mark.asyncio @@ -316,4 +325,6 @@ def main(): main() """ - _run_subprocess_test(script, iterations=10) + # Use longer timeout (30s) for mixed concurrency test - most complex scenario + # with multiple threads each running multiple async tasks + _run_subprocess_test(script, iterations=10, timeout=30) From a726ee5edd6022b104c8879ec67967cb1393888c Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 12:34:39 +0900 Subject: [PATCH 23/28] fix: Trigger shutdown from Python to avoid race condition The previous implementation triggered shutdown from within the tokio task (exit_context called request_shutdown_background). This created a race condition where the runtime could start shutting down while the task was still trying to return its result to Python, causing hangs in multi-threaded scenarios. The fix separates the two operations: 1. exit_context() now only returns a flag indicating if this was the last context 2. _trigger_shutdown() is a new function called from Python AFTER the tokio task has completed and returned 3. Then _join_pending_shutdown() blocks until the runtime thread terminates This ensures the tokio task completes successfully before shutdown begins. --- src/client.rs | 24 +++++++++++++++--------- src/lib.rs | 1 + src/runtime.rs | 37 +++++++++++++++++++++---------------- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/src/client.rs b/src/client.rs index 2b32c8f..a7b7cdc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -169,40 +169,46 @@ impl PyClient { }; // Create the tokio async cleanup coroutine - // Returns True if this was the last client and shutdown was triggered + // Returns True if this was the last client (context count dropped to 0) let inner_cleanup = future_into_py(py, async move { if let Some(lock_manager) = lock_manager { lock_manager.lock().await.handle_aexit().await?; } // Decrement context count after all cleanup is done - // This may trigger runtime shutdown if this was the last active context - let triggered_shutdown = crate::runtime::exit_context(); + // Returns true if this was the last active context + let is_last_context = crate::runtime::exit_context(); - Ok(triggered_shutdown) + Ok(is_last_context) })?; - // Get the join function for use in the wrapper + // Get the runtime functions for use in the wrapper let etcd_client = py.import("etcd_client")?; + let trigger_shutdown_fn = etcd_client.getattr("_trigger_shutdown")?; let join_fn = etcd_client.getattr("_join_pending_shutdown")?; // Create a Python wrapper coroutine that: - // 1. Awaits the inner cleanup - // 2. If shutdown was triggered, awaits the blocking join via to_thread + // 1. Awaits the inner cleanup (tokio task) + // 2. If this was the last context, trigger shutdown AFTER tokio task completes + // 3. Then await the blocking join via to_thread let asyncio = py.import("asyncio")?; let to_thread = asyncio.getattr("to_thread")?; // Build the wrapper coroutine using Python code let globals = PyDict::new(py); globals.set_item("inner_cleanup", inner_cleanup)?; + globals.set_item("trigger_shutdown_fn", trigger_shutdown_fn)?; globals.set_item("join_fn", join_fn)?; globals.set_item("to_thread", to_thread)?; py.run( c" async def _aexit_wrapper(): - triggered_shutdown = await inner_cleanup - if triggered_shutdown: + is_last_context = await inner_cleanup + if is_last_context: + # Trigger shutdown AFTER the tokio task has completed + # This avoids a race where shutdown starts while the task is returning + trigger_shutdown_fn() await to_thread(join_fn) return None _result = _aexit_wrapper() diff --git a/src/lib.rs b/src/lib.rs index 0459aa4..b074319 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,6 +76,7 @@ mod etcd_client { // Add runtime functions m.add_function(wrap_pyfunction!(crate::runtime::cleanup_runtime, m)?)?; m.add_function(wrap_pyfunction!(crate::runtime::active_context_count, m)?)?; + m.add_function(wrap_pyfunction!(crate::runtime::_trigger_shutdown, m)?)?; m.add_function(wrap_pyfunction!(crate::runtime::_join_pending_shutdown, m)?)?; Ok(()) diff --git a/src/runtime.rs b/src/runtime.rs index d54a172..8cf1ae2 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -15,26 +15,18 @@ pub fn enter_context() { /// Called when a client exits its async context (`__aexit__`). /// -/// Decrements the active context count. If this was the last active context -/// (count drops from 1 to 0), automatically triggers runtime shutdown. +/// Decrements the active context count and returns `true` if this was the +/// last active context (count drops from 1 to 0). /// -/// Note: Uses `request_shutdown_background` instead of `request_shutdown` -/// because this function is called from within an async context (inside -/// `future_into_py`). The background version signals shutdown without -/// blocking, allowing the current async task to complete gracefully. +/// Note: This function does NOT trigger the shutdown directly. The caller +/// should trigger shutdown from Python AFTER the tokio task completes, to +/// avoid a race condition where the runtime starts shutting down while the +/// task is still returning its result. /// -/// Returns `true` if cleanup was triggered, `false` otherwise. +/// Returns `true` if this was the last context, `false` otherwise. pub fn exit_context() -> bool { let prev = ACTIVE_CONTEXTS.fetch_sub(1, Ordering::SeqCst); - - if prev == 1 { - // Was 1, now 0 - last context exited, cleanup runtime - // Use background shutdown since we're inside an async context - pyo3_async_runtimes::tokio::request_shutdown_background(5000); - true - } else { - false - } + prev == 1 } /// Get the current count of active client contexts. @@ -74,6 +66,19 @@ pub fn cleanup_runtime() { pyo3_async_runtimes::tokio::request_shutdown(5000); } +/// Internal function to trigger runtime shutdown in the background. +/// +/// This is called from Python AFTER the tokio task has completed and +/// returned its result. This avoids a race condition where the runtime +/// starts shutting down while a task is still trying to return. +/// +/// Users should not call this directly - it's used internally by the +/// client's `__aexit__` implementation. +#[pyfunction] +pub fn _trigger_shutdown() { + pyo3_async_runtimes::tokio::request_shutdown_background(5000); +} + /// Internal function to join the pending runtime shutdown thread. /// /// This is called from Python via `asyncio.to_thread()` to block until From 1cd21d975fb8b88c95f728363a374ab3dfd3ea34 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 18:06:13 +0900 Subject: [PATCH 24/28] refactor: Tidy up runtime cleanup code for clarity - runtime.rs: Add SHUTDOWN_TIMEOUT_MS constant, restrict internal functions to pub(crate), simplify docstrings, add section comments - client.rs: Extract Python wrapper code to AEXIT_WRAPPER_CODE constant, simplify __aexit__ method, remove redundant comments - lib.rs: Reorganize exports with section comments - test_shutdown_stress.py: Add timeout constants (DEFAULT_TIMEOUT, THREADED_TIMEOUT, MIXED_CONCURRENCY_TIMEOUT), simplify embedded scripts No functional changes - all 24 tests pass. --- src/client.rs | 78 +++++++++---------------- src/lib.rs | 25 ++++---- src/runtime.rs | 92 ++++++++++------------------- tests/test_shutdown_stress.py | 106 +++++++++------------------------- 4 files changed, 96 insertions(+), 205 deletions(-) diff --git a/src/client.rs b/src/client.rs index a7b7cdc..b2430af 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,6 +10,21 @@ use crate::communicator::PyCommunicator; use crate::error::PyClientError; use crate::lock_manager::{EtcdLockManager, PyEtcdLockOption}; +/// Python wrapper coroutine for async exit. +/// +/// Shutdown sequence: +/// 1. Await inner_cleanup (tokio task) - returns true if this was the last context +/// 2. If last context: trigger_shutdown_fn() signals runtime to shut down +/// 3. Then await to_thread(join_fn) to block until runtime thread terminates +const AEXIT_WRAPPER_CODE: &std::ffi::CStr = c" +async def _aexit_wrapper(): + is_last = await inner_cleanup + if is_last: + trigger_shutdown_fn() + await to_thread(join_fn) +_result = _aexit_wrapper() +"; + #[pyclass(name = "ConnectOptions")] #[derive(Debug, Clone, Default)] pub struct PyConnectOptions(pub ConnectOptions); @@ -81,10 +96,9 @@ impl PyClient { connect_options: Option, lock_options: Option, ) -> Self { - let connect_options = connect_options.unwrap_or_default(); Self { endpoints, - connect_options, + connect_options: connect_options.unwrap_or_default(), lock_options, lock_manager: None, } @@ -118,7 +132,6 @@ impl PyClient { #[pyo3(signature = ())] fn __aenter__<'a>(&'a mut self, py: Python<'a>) -> PyResult> { - // Increment context count before any async work crate::runtime::enter_context(); let endpoints = self.endpoints.clone(); @@ -130,7 +143,6 @@ impl PyClient { self.clone(), lock_options.clone(), )))); - Some(self.lock_manager.clone().unwrap()) } else { None @@ -146,7 +158,6 @@ impl PyClient { } } Err(e) => { - // Connection failed - decrement context count to maintain balance crate::runtime::exit_context(); Err(PyClientError(e).into()) } @@ -160,64 +171,31 @@ impl PyClient { py: Python<'a>, _args: &Bound<'a, PyTuple>, ) -> PyResult> { - let lock_options = self.lock_options.clone(); + let lock_manager = self + .lock_options + .as_ref() + .map(|_| self.lock_manager.clone().unwrap()); - let lock_manager = if lock_options.is_some() { - Some(self.lock_manager.clone().unwrap()) - } else { - None - }; - - // Create the tokio async cleanup coroutine - // Returns True if this was the last client (context count dropped to 0) + // Tokio task: cleanup and return whether this was the last context let inner_cleanup = future_into_py(py, async move { if let Some(lock_manager) = lock_manager { lock_manager.lock().await.handle_aexit().await?; } - - // Decrement context count after all cleanup is done - // Returns true if this was the last active context - let is_last_context = crate::runtime::exit_context(); - - Ok(is_last_context) + Ok(crate::runtime::exit_context()) })?; - // Get the runtime functions for use in the wrapper + // Build Python wrapper coroutine let etcd_client = py.import("etcd_client")?; - let trigger_shutdown_fn = etcd_client.getattr("_trigger_shutdown")?; - let join_fn = etcd_client.getattr("_join_pending_shutdown")?; - - // Create a Python wrapper coroutine that: - // 1. Awaits the inner cleanup (tokio task) - // 2. If this was the last context, trigger shutdown AFTER tokio task completes - // 3. Then await the blocking join via to_thread let asyncio = py.import("asyncio")?; - let to_thread = asyncio.getattr("to_thread")?; - // Build the wrapper coroutine using Python code let globals = PyDict::new(py); globals.set_item("inner_cleanup", inner_cleanup)?; - globals.set_item("trigger_shutdown_fn", trigger_shutdown_fn)?; - globals.set_item("join_fn", join_fn)?; - globals.set_item("to_thread", to_thread)?; + globals.set_item("trigger_shutdown_fn", etcd_client.getattr("_trigger_shutdown")?)?; + globals.set_item("join_fn", etcd_client.getattr("_join_pending_shutdown")?)?; + globals.set_item("to_thread", asyncio.getattr("to_thread")?)?; - py.run( - c" -async def _aexit_wrapper(): - is_last_context = await inner_cleanup - if is_last_context: - # Trigger shutdown AFTER the tokio task has completed - # This avoids a race where shutdown starts while the task is returning - trigger_shutdown_fn() - await to_thread(join_fn) - return None -_result = _aexit_wrapper() -", - Some(&globals), - None, - )?; + py.run(AEXIT_WRAPPER_CODE, Some(&globals), None)?; - let result = globals.get_item("_result")?.unwrap(); - Ok(result) + Ok(globals.get_item("_result")?.unwrap()) } } diff --git a/src/lib.rs b/src/lib.rs index b074319..db33a84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,26 +23,27 @@ mod etcd_client { }; use pyo3::prelude::*; + // Classes #[pymodule_export] - use crate::txn::{PyTxn, PyTxnOp}; + use crate::client::{PyClient, PyConnectOptions}; #[pymodule_export] - use crate::txn_response::PyTxnResponse; + use crate::communicator::PyCommunicator; #[pymodule_export] - use crate::lock_manager::PyEtcdLockOption; + use crate::compare::{PyCompare, PyCompareOp}; #[pymodule_export] - use crate::client::{PyClient, PyConnectOptions}; + use crate::condvar::PyCondVar; #[pymodule_export] - use crate::communicator::PyCommunicator; + use crate::lock_manager::PyEtcdLockOption; #[pymodule_export] - use crate::compare::{PyCompare, PyCompareOp}; + use crate::txn::{PyTxn, PyTxnOp}; #[pymodule_export] - use crate::condvar::PyCondVar; + use crate::txn_response::PyTxnResponse; #[pymodule_export] use crate::watch::PyWatch; @@ -57,6 +58,7 @@ mod etcd_client { fn init(m: &Bound<'_, PyModule>) -> PyResult<()> { let py = m.py(); + // Exception types m.add("ClientError", py.get_type::())?; m.add("GRPCStatusError", py.get_type::())?; m.add("InvalidArgsError", py.get_type::())?; @@ -67,15 +69,14 @@ mod etcd_client { m.add("Utf8Error", py.get_type::())?; m.add("LeaseKeepAliveError", py.get_type::())?; m.add("ElectError", py.get_type::())?; - m.add( - "InvalidHeaderValueError", - py.get_type::(), - )?; + m.add("InvalidHeaderValueError", py.get_type::())?; m.add("EndpointError", py.get_type::())?; - // Add runtime functions + // Runtime management (public API) m.add_function(wrap_pyfunction!(crate::runtime::cleanup_runtime, m)?)?; m.add_function(wrap_pyfunction!(crate::runtime::active_context_count, m)?)?; + + // Runtime internals (used by __aexit__) m.add_function(wrap_pyfunction!(crate::runtime::_trigger_shutdown, m)?)?; m.add_function(wrap_pyfunction!(crate::runtime::_join_pending_shutdown, m)?)?; diff --git a/src/runtime.rs b/src/runtime.rs index 8cf1ae2..a903cdb 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,92 +1,58 @@ use pyo3::prelude::*; use std::sync::atomic::{AtomicUsize, Ordering}; -/// Count of active client context managers (those currently inside `async with`). +/// Shutdown timeout in milliseconds for graceful runtime cleanup. +const SHUTDOWN_TIMEOUT_MS: u64 = 5000; + +/// Active client context count. static ACTIVE_CONTEXTS: AtomicUsize = AtomicUsize::new(0); -/// Called when a client enters its async context (`__aenter__`). -/// -/// Increments the active context count. If this is the first context after -/// a previous shutdown, the tokio runtime will be lazily re-initialized -/// on the first spawn operation. -pub fn enter_context() { +// ============================================================================ +// Context Counting (internal) +// ============================================================================ + +/// Increment context count on `__aenter__`. +pub(crate) fn enter_context() { ACTIVE_CONTEXTS.fetch_add(1, Ordering::SeqCst); } -/// Called when a client exits its async context (`__aexit__`). -/// -/// Decrements the active context count and returns `true` if this was the -/// last active context (count drops from 1 to 0). -/// -/// Note: This function does NOT trigger the shutdown directly. The caller -/// should trigger shutdown from Python AFTER the tokio task completes, to -/// avoid a race condition where the runtime starts shutting down while the -/// task is still returning its result. -/// -/// Returns `true` if this was the last context, `false` otherwise. -pub fn exit_context() -> bool { +/// Decrement context count on `__aexit__`. +/// Returns `true` if this was the last context (count dropped to 0). +pub(crate) fn exit_context() -> bool { let prev = ACTIVE_CONTEXTS.fetch_sub(1, Ordering::SeqCst); prev == 1 } -/// Get the current count of active client contexts. -/// -/// Useful for debugging and testing automatic cleanup behavior. -/// Returns 0 when no clients are in an active context manager. +// ============================================================================ +// Public API +// ============================================================================ + +/// Get current active context count (for debugging/testing). #[pyfunction] pub fn active_context_count() -> usize { ACTIVE_CONTEXTS.load(Ordering::SeqCst) } -/// Explicitly request graceful shutdown of the tokio runtime. -/// -/// In most cases, the runtime is automatically cleaned up when the last -/// client context exits. This function is provided for cases where explicit -/// control is needed. +/// Explicitly request graceful runtime shutdown. /// -/// # Example -/// -/// ```python -/// import asyncio -/// from etcd_client import cleanup_runtime -/// -/// async def main(): -/// # Your etcd operations here -/// ... -/// # Explicit cleanup (usually not needed) -/// cleanup_runtime() -/// -/// asyncio.run(main()) -/// ``` -/// -/// This function uses tokio's `shutdown_timeout()` to gracefully shut down all tasks, -/// waiting up to 5 seconds for pending tasks to complete. +/// Usually not needed - runtime is automatically cleaned up when the last +/// client context exits. #[pyfunction] pub fn cleanup_runtime() { - pyo3_async_runtimes::tokio::request_shutdown(5000); + pyo3_async_runtimes::tokio::request_shutdown(SHUTDOWN_TIMEOUT_MS); } -/// Internal function to trigger runtime shutdown in the background. -/// -/// This is called from Python AFTER the tokio task has completed and -/// returned its result. This avoids a race condition where the runtime -/// starts shutting down while a task is still trying to return. -/// -/// Users should not call this directly - it's used internally by the -/// client's `__aexit__` implementation. +// ============================================================================ +// Internal Shutdown Helpers (used by __aexit__) +// ============================================================================ + +/// Trigger runtime shutdown in background. Called from Python after tokio task completes. #[pyfunction] pub fn _trigger_shutdown() { - pyo3_async_runtimes::tokio::request_shutdown_background(5000); + pyo3_async_runtimes::tokio::request_shutdown_background(SHUTDOWN_TIMEOUT_MS); } -/// Internal function to join the pending runtime shutdown thread. -/// -/// This is called from Python via `asyncio.to_thread()` to block until -/// the runtime thread has fully terminated. The GIL is released during -/// the blocking wait. -/// -/// Users should not call this directly - it's used internally by the -/// client's `__aexit__` implementation. +/// Block until runtime thread terminates. Called via `asyncio.to_thread()`. #[pyfunction] pub fn _join_pending_shutdown(py: Python<'_>) -> bool { pyo3_async_runtimes::tokio::join_pending_shutdown(py) diff --git a/tests/test_shutdown_stress.py b/tests/test_shutdown_stress.py index 954ef29..bc44df9 100644 --- a/tests/test_shutdown_stress.py +++ b/tests/test_shutdown_stress.py @@ -21,16 +21,16 @@ import pytest +# Subprocess timeout values (seconds) +DEFAULT_TIMEOUT = 10 +THREADED_TIMEOUT = 20 # Multi-threaded tests need more time +MIXED_CONCURRENCY_TIMEOUT = 30 # Most complex scenario -def _make_test_script(test_code: str, etcd_port: int) -> str: - """Create a test script for subprocess execution. - Note: No explicit cleanup_runtime() call is needed - automatic cleanup - happens when the last client context exits. - """ +def _make_test_script(test_code: str, etcd_port: int) -> str: + """Create a test script for subprocess execution.""" return f""" import asyncio - from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair async def main(): @@ -41,26 +41,17 @@ async def main(): ConfigScopes.GLOBAL: "global", }}, ) - {test_code} - # No explicit cleanup_runtime() needed - automatic cleanup on context exit - if __name__ == "__main__": asyncio.run(main()) """ def _run_subprocess_test( - script_content: str, iterations: int = 10, timeout: int = 10 + script_content: str, iterations: int = 10, timeout: int = DEFAULT_TIMEOUT ) -> None: - """Run a test script in subprocess multiple times to detect shutdown issues. - - Args: - script_content: The Python script to run. - iterations: Number of times to run the script. - timeout: Subprocess timeout in seconds per iteration (default 10). - """ + """Run a test script in subprocess multiple times to detect shutdown issues.""" project_root = str(Path(__file__).parent.parent.resolve()) env = os.environ.copy() env["PYTHONPATH"] = project_root @@ -81,14 +72,12 @@ def _run_subprocess_test( ) if result.returncode != 0: - failures.append( - { - "iteration": i + 1, - "returncode": result.returncode, - "stderr": result.stderr, - "stdout": result.stdout, - } - ) + failures.append({ + "iteration": i + 1, + "returncode": result.returncode, + "stderr": result.stderr, + "stdout": result.stdout, + }) if failures: error_msg = f"Failed {len(failures)}/{iterations} iterations:\n" @@ -115,7 +104,6 @@ async def test_shutdown_with_active_watch(etcd_container) -> None: watch_iter = etcd.watch("test_key") await etcd.put("test_key", "value1") """ - script = _make_test_script(test_code, etcd_port) _run_subprocess_test(script, iterations=20) @@ -132,7 +120,6 @@ async def test_shutdown_with_concurrent_operations(etcd_container) -> None: tasks.append(etcd.put(f"key_{i}", f"value_{i}")) await asyncio.gather(*tasks) """ - script = _make_test_script(test_code, etcd_port) _run_subprocess_test(script, iterations=20) @@ -146,14 +133,13 @@ async def test_shutdown_rapid_subprocess(etcd_container) -> None: async with etcd: await etcd.put("rapid_test", "value") """ - script = _make_test_script(test_code, etcd_port) _run_subprocess_test(script, iterations=50) @pytest.mark.asyncio async def test_shutdown_sanity_check(etcd_container) -> None: - """Verify that the subprocess test infrastructure works correctly.""" + """Verify subprocess test infrastructure works correctly.""" etcd_port = etcd_container.get_exposed_port(2379) test_code = """ @@ -162,18 +148,13 @@ async def test_shutdown_sanity_check(etcd_container) -> None: value = await etcd.get("sanity") assert value == "check" """ - script = _make_test_script(test_code, etcd_port) _run_subprocess_test(script, iterations=5) @pytest.mark.asyncio async def test_shutdown_multi_async_tasks(etcd_container) -> None: - """Test shutdown with multiple concurrent async tasks, each with its own client. - - This tests the reference counting mechanism with multiple clients sharing - one event loop - shutdown should only happen when the last client exits. - """ + """Test shutdown with multiple concurrent async tasks sharing one event loop.""" etcd_port = etcd_container.get_exposed_port(2379) script = f""" @@ -184,9 +165,7 @@ async def worker(worker_id: int): etcd = AsyncEtcd( addr=HostPortPair(host="127.0.0.1", port={etcd_port}), namespace=f"test_multi_task_{{worker_id}}", - scope_prefix_map={{ - ConfigScopes.GLOBAL: "global", - }}, + scope_prefix_map={{ConfigScopes.GLOBAL: "global"}}, ) async with etcd: for i in range(5): @@ -194,24 +173,18 @@ async def worker(worker_id: int): await asyncio.sleep(0.01) async def main(): - # Launch 5 concurrent tasks, each with its own client tasks = [asyncio.create_task(worker(i)) for i in range(5)] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main()) """ - _run_subprocess_test(script, iterations=20) @pytest.mark.asyncio async def test_shutdown_multi_threaded(etcd_container) -> None: - """Test shutdown with multiple threads, each running its own event loop. - - This tests thread safety of the reference counting mechanism - each thread - has its own asyncio event loop and creates/destroys clients independently. - """ + """Test shutdown with multiple threads, each running its own event loop.""" etcd_port = etcd_container.get_exposed_port(2379) script = f""" @@ -225,33 +198,24 @@ async def async_work(): etcd = AsyncEtcd( addr=HostPortPair(host="127.0.0.1", port={etcd_port}), namespace=f"test_multi_thread_{{thread_id}}", - scope_prefix_map={{ - ConfigScopes.GLOBAL: "global", - }}, + scope_prefix_map={{ConfigScopes.GLOBAL: "global"}}, ) async with etcd: for i in range(3): await etcd.put(f"key_{{thread_id}}_{{i}}", f"value_{{i}}") - asyncio.run(async_work()) results.append(thread_id) except Exception as e: errors.append((thread_id, str(e))) def main(): - results = [] - errors = [] - threads = [] - - # Launch 4 threads, each with its own event loop and client + results, errors, threads = [], [], [] for i in range(4): t = threading.Thread(target=thread_worker, args=(i, results, errors)) threads.append(t) t.start() - for t in threads: t.join(timeout=10) - if errors: raise RuntimeError(f"Thread errors: {{errors}}") if len(results) != 4: @@ -260,18 +224,12 @@ def main(): if __name__ == "__main__": main() """ - - # Use longer timeout (20s) for multi-threaded test due to thread setup overhead - _run_subprocess_test(script, iterations=10, timeout=20) + _run_subprocess_test(script, iterations=10, timeout=THREADED_TIMEOUT) @pytest.mark.asyncio async def test_shutdown_mixed_concurrency(etcd_container) -> None: - """Test shutdown with mixed concurrency: multiple threads with multiple async tasks each. - - This is the most complex scenario - multiple threads each running their own - event loop with multiple concurrent async tasks. - """ + """Test shutdown with multiple threads, each running multiple async tasks.""" etcd_port = etcd_container.get_exposed_port(2379) script = f""" @@ -285,15 +243,12 @@ async def async_task(task_id: int): etcd = AsyncEtcd( addr=HostPortPair(host="127.0.0.1", port={etcd_port}), namespace=f"test_mixed_{{thread_id}}_{{task_id}}", - scope_prefix_map={{ - ConfigScopes.GLOBAL: "global", - }}, + scope_prefix_map={{ConfigScopes.GLOBAL: "global"}}, ) async with etcd: - await etcd.put(f"key", f"value_{{thread_id}}_{{task_id}}") + await etcd.put("key", f"value_{{thread_id}}_{{task_id}}") async def run_tasks(): - # Each thread runs 3 concurrent async tasks tasks = [asyncio.create_task(async_task(i)) for i in range(3)] await asyncio.gather(*tasks) @@ -303,19 +258,13 @@ async def run_tasks(): errors.append((thread_id, str(e))) def main(): - results = [] - errors = [] - threads = [] - - # Launch 3 threads + results, errors, threads = [], [], [] for i in range(3): t = threading.Thread(target=thread_worker, args=(i, results, errors)) threads.append(t) t.start() - for t in threads: t.join(timeout=15) - if errors: raise RuntimeError(f"Thread errors: {{errors}}") if len(results) != 3: @@ -324,7 +273,4 @@ def main(): if __name__ == "__main__": main() """ - - # Use longer timeout (30s) for mixed concurrency test - most complex scenario - # with multiple threads each running multiple async tasks - _run_subprocess_test(script, iterations=10, timeout=30) + _run_subprocess_test(script, iterations=10, timeout=MIXED_CONCURRENCY_TIMEOUT) From d510a3c4a3352f7e79fc6f5bb4b7e7951ceebe09 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 18:12:17 +0900 Subject: [PATCH 25/28] docs: Reorganize README structure for clarity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add "Working with key prefixes" subsection under Basic usage - Move "Automatic runtime cleanup" to its own top-level section - Add "Lock timeout" and "Lock TTL" subsections under Etcd lock - Add "Watch with prefix" subsection under Watch - Condense code quality section - Fix typo: http::// → http:// --- README.md | 139 +++++++++++++++++++++--------------------------------- 1 file changed, 54 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index c7693c5..91a4db7 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ pip install etcd_client ```python from etcd_client import EtcdClient -etcd = EtcdClient(['http:://127.0.0.1:2379']) +etcd = EtcdClient(['http://127.0.0.1:2379']) ``` Actual connection establishment with Etcd's gRPC channel will be done when you call `EtcdClient.connect()`. @@ -28,11 +28,34 @@ async def main(): print(bytes(value).decode()) # testvalue ``` -### Automatic runtime cleanup +### Working with key prefixes + +`EtcdCommunicator.get_prefix(prefix)` returns a list of key-value pairs matching the given prefix. + +```python +async def main(): + async with etcd.connect() as communicator: + await communicator.put('/testdir'.encode(), 'root'.encode()) + await communicator.put('/testdir/1'.encode(), '1'.encode()) + await communicator.put('/testdir/2'.encode(), '2'.encode()) + await communicator.put('/testdir/2/3'.encode(), '3'.encode()) + + test_dir = await communicator.get_prefix('/testdir'.encode()) + + for resp in test_dir: + # ['/testdir', 'root'] + # ['/testdir/1', '1'] + # ['/testdir/2', '2'] + # ['/testdir/2/3', '3'] + print([bytes(v).decode() for v in resp]) +``` + +## Automatic runtime cleanup The tokio runtime is automatically cleaned up when the last client context exits. In most cases, no explicit cleanup is needed: ```python +import asyncio from etcd_client import EtcdClient async def main(): @@ -48,7 +71,7 @@ asyncio.run(main()) The library uses reference counting to track active client contexts. When the last context exits, the tokio runtime is gracefully shut down, waiting up to 5 seconds for pending tasks to complete. If you create new clients after this, the runtime is automatically re-initialized. -For advanced use cases requiring explicit control, `cleanup_runtime()` is still available: +For advanced use cases requiring explicit control, `cleanup_runtime()` is available: ```python from etcd_client import cleanup_runtime @@ -57,29 +80,9 @@ from etcd_client import cleanup_runtime cleanup_runtime() ``` -`EtcdCommunicator.get_prefix(prefix)` will return a tuple of list containing all key-values with given key prefix. - -```python -async def main(): - async with etcd.connect() as communicator: - await communicator.put('/testdir'.encode(), 'root'.encode()) - await communicator.put('/testdir/1'.encode(), '1'.encode()) - await communicator.put('/testdir/2'.encode(), '2'.encode()) - await communicator.put('/testdir/2/3'.encode(), '3'.encode()) - - test_dir = await communicator.get_prefix('/testdir'.encode()) - - for resp in test_dir: - # ['/testdir', 'root'] - # ['/testdir/1', '1'] - # ['/testdir/2', '2'] - # ['/testdir/2/3', '3'] - print([bytes(v).decode() for v in resp]) -``` - ## Operating with Etcd lock -Just like `EtcdClient.connect()`, you can easilly use etcd lock by calling `EtcdClient.with_lock(lock_opts)`. +Just like `EtcdClient.connect()`, you can easily use etcd lock by calling `EtcdClient.with_lock(lock_opts)`. ```python async def first(): @@ -106,18 +109,11 @@ async with etcd.connect() as communicator: await asyncio.gather(first(), second()) # first: testvalue | second: testvalue ``` -Adding `timeout` parameter to `EtcdClient.with_lock()` call will add a timeout to lock acquiring process. +### Lock timeout -```python -async def first(): - async with etcd.with_lock( - EtcdLockOption( - lock_name='foolock'.encode(), - ) - ) as communicator: - value = await communicator.get('testkey'.encode()) - print('first:', bytes(value).decode(), end=' | ') +Adding `timeout` parameter to `EtcdLockOption` will add a timeout to the lock acquiring process. +```python async def second(): await asyncio.sleep(0.1) async with etcd.with_lock( @@ -128,13 +124,11 @@ async def second(): ) as communicator: value = await communicator.get('testkey'.encode()) print('second:', bytes(value).decode()) - -async with etcd.connect() as communicator: - await communicator.put('testkey'.encode(), 'testvalue'.encode()) -await asyncio.gather(first(), second()) # first: testvalue | second: testvalue ``` -Adding `ttl` parameter to `EtcdClient.with_lock()` call will force lock to be released after given seconds. +### Lock TTL + +Adding `ttl` parameter to `EtcdLockOption` will force the lock to be released after the given seconds. ```python async def first(): @@ -162,7 +156,7 @@ for task in done: ## Watch -You can watch changes on key with `EtcdCommunicator.watch(key)`. +You can watch changes on a key with `EtcdCommunicator.watch(key)`. ```python async def watch(): @@ -187,7 +181,9 @@ await asyncio.gather(watch(), update()) # WatchEventType.PUT 5 ``` -Watching changes on keys with specific prefix can be also done by `EtcdCommunicator.watch_prefix(key_prefix)`. +### Watch with prefix + +Watching changes on keys with a specific prefix can be done with `EtcdCommunicator.watch_prefix(key_prefix)`. ```python async def watch(): @@ -212,11 +208,11 @@ await asyncio.gather(watch(), update()) ## Transaction -You can run etcd transaction by calling `EtcdCommunicator.txn(txn)`. +You can run etcd transactions by calling `EtcdCommunicator.txn(txn)`. ### Constructing compares -Constructing compare operations can be done by comparing `Compare` instance. +Constructing compare operations can be done using the `Compare` class. ```python from etcd_client import Compare, CompareOp @@ -226,7 +222,7 @@ compares = [ ] ``` -### Executing transaction calls +### Executing transactions ```python async with etcd.connect() as communicator: @@ -240,29 +236,26 @@ async with etcd.connect() as communicator: ] res = await communicator.txn(Txn().when(compares).and_then([TxnOp.get('successkey'.encode())])) - print(res) # TODO: Need to write response type bindings. + print(res) # TODO: Need to write response type bindings. ``` ## How to build -### Prerequisite +### Prerequisites -* The Rust development environment (the 2021 edition or later) using [`rustup`](https://rustup.rs/) or your package manager +* The Rust development environment (2021 edition or later) using [`rustup`](https://rustup.rs/) or your package manager * The Python development environment (3.10 or later) using [`pyenv`](https://github.com/pyenv/pyenv#installation) or your package manager -### Build instruction +### Build instructions -First, create a virtualenv (either using the standard venv package, pyenv, or -whatever your favorite). Then, install the PEP-517 build toolchain and run it. +First, create a virtualenv (using the standard venv package, pyenv, or your preferred tool). Then, install the PEP-517 build toolchain and run it. ```shell pip install -U pip build setuptools python -m build --sdist --wheel ``` -It will automatically install build dependencies like -[`maturin`](https://github.com/PyO3/maturin) and build the wheel and source -distributions under the `dist/` directory. +This will automatically install build dependencies like [`maturin`](https://github.com/PyO3/maturin) and build the wheel and source distributions under the `dist/` directory. ## How to develop and test @@ -287,42 +280,18 @@ uv run maturin develop # Builds and installs the Rust extension This project uses ruff for linting/formatting and mypy for type checking: ```bash -# Format Python code -make fmt-py - -# Lint Python code -make lint-py - -# Auto-fix Python issues (format + fixable lints) -make fix-py - -# Type check Python code -make typecheck - -# Auto-fix Rust issues (format + fixable clippy lints) -make fix-rust - -# Auto-fix all issues (Python + Rust) -make fix - -# Format all code (Python + Rust) -make fmt - -# Lint all code (Python + Rust) -make lint - -# Run all checks (Python + Rust) -make check +make fmt # Format all code (Python + Rust) +make lint # Lint all code (Python + Rust) +make fix # Auto-fix all issues (Python + Rust) +make typecheck # Type check Python code +make check # Run all checks ``` ### Running tests ```bash -# Run tests using uv -make test - -# Or directly with uv -uv run pytest +make test # Run tests using uv +uv run pytest # Or directly with uv -# The tests use testcontainers to automatically spin up etcd +# Tests use testcontainers to automatically spin up etcd ``` From 0352b4cc2278d8aa4c4bcd2718b832d3e5d6e4bf Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 18:13:28 +0900 Subject: [PATCH 26/28] ci: Clean up debug codes in CI workflow --- .github/workflows/ci.yml | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3fd8db..321399f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,23 +49,12 @@ jobs: - name: Check python build run: | uv run --no-project python -c 'import sys; print(sys.prefix); print(sys.version_info)' - uv run --no-project python -c 'import sysconfig; print("Py_GIL_DISABLED=", sysconfig.get_config_var("Py_GIL_DISABLED"))' + uv run --no-project python -c 'import sysconfig; print(f"Py_GIL_DISABLED={sysconfig.get_config_var("Py_GIL_DISABLED")}")' uv run --no-project python -c 'import sys; print(f"{sys.abiflags=}")' - which python - python --version - which -a python3.14 || true - which -a python3.14t || true - name: Install dependencies and build the package run: | uv sync --locked --all-extras --no-install-project - PYO3_PRINT_CONFIG=1 uv run maturin develop 2>&1 || true uv run maturin develop - - name: Upload built extension module (for debugging) - if: matrix.platform.arch == 'aarch64' && matrix.python-version == '3.14' - uses: actions/upload-artifact@v4 - with: - name: debug-extension-${{ matrix.platform.arch }}-${{ matrix.python-version }} - path: python/etcd_client/*.so - name: Test run: | uv run pytest From c972c99f5ceb87ab79dd592ea59e13ae33112be3 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 18:17:25 +0900 Subject: [PATCH 27/28] ci: Arghhh... --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 321399f..9f532d7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: - name: Check python build run: | uv run --no-project python -c 'import sys; print(sys.prefix); print(sys.version_info)' - uv run --no-project python -c 'import sysconfig; print(f"Py_GIL_DISABLED={sysconfig.get_config_var("Py_GIL_DISABLED")}")' + uv run --no-project python -c 'import sysconfig; flag=sysconfig.get_config_var("Py_GIL_DISABLED"); print(f"Py_GIL_DISABLED={flag}")' uv run --no-project python -c 'import sys; print(f"{sys.abiflags=}")' - name: Install dependencies and build the package run: | From f1be35f6af3320b539ff218f8032d4b5adbe0ae2 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Fri, 9 Jan 2026 18:40:00 +0900 Subject: [PATCH 28/28] chore: Update vendored pyo3-async-runtimes with structured commits Updates vendored pyo3-async-runtimes (PyO3/pyo3-async-runtimes#71) with cleaner commit history: 1. deps: Replace futures with futures-channel/futures-util, add parking_lot 2. feat(tokio): Add RuntimeWrapper with graceful shutdown support 3. feat(async-std): Add spawn/spawn_blocking/request_shutdown for API consistency 4. refactor(macros): Update to use new spawn_blocking API 5. test: Add shutdown tests and update existing tests for deprecated API --- vendor/pyo3-async-runtimes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/pyo3-async-runtimes b/vendor/pyo3-async-runtimes index 1b266e5..97414ae 160000 --- a/vendor/pyo3-async-runtimes +++ b/vendor/pyo3-async-runtimes @@ -1 +1 @@ -Subproject commit 1b266e5f06fbed5c11f2ce5fa55e71820cfcab36 +Subproject commit 97414ae357f9fe82bcab9ceeb5dc1cbec93c24d7