Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3ff7a4a
test: Add a reproducing case that crashes upon python shutdown
achimnol Jul 24, 2025
109ff71
test: Use ephemeral port numbers to run tests concurrently with a run…
achimnol Jul 24, 2025
38e7391
chore: Add helper script to run a standalone etcd locally
achimnol Jul 27, 2025
608d1f9
fix: Make AsyncEtcd as explicit async context manager, but still not …
achimnol Jul 27, 2025
aed0dd5
test: Let subprocess load tests.harness module
achimnol Jul 27, 2025
a1f206b
test: Let tests not rely on locally running etcd at 2379
achimnol Jul 27, 2025
2ae21ff
fix: typo
achimnol Jul 27, 2025
2a9efb7
Merge branch 'main' into topic/fix-crash-upon-python-shutdown
achimnol Jan 3, 2026
62b42d0
Merge branch 'main' into topic/fix-crash-upon-python-shutdown
achimnol Jan 3, 2026
29e45aa
fix: lint/typecheck failures
achimnol Jan 3, 2026
b7b5552
test: Update test codes
achimnol Jan 3, 2026
db0600c
test: Add more realistic test cases for shutdown crashes
achimnol Jan 3, 2026
7ee1a0c
fix: Implement the fix
achimnol Jan 3, 2026
59595d9
fix: Reimplement cleanup using atomic task counter with polling + tim…
achimnol Jan 3, 2026
e881f93
fix: Remove no longer used runtime mgmt codes
achimnol Jan 3, 2026
0cd1e55
refactor: Remove no longer needed codes
achimnol Jan 3, 2026
b48ac27
fix: Use tokio's own task counter to improve compat/interop with othe…
achimnol Jan 4, 2026
8cbc480
ci: Prevent duplicate builds during CI runs
achimnol Jan 4, 2026
fad7b0c
Merge branch 'main' into topic/fix-crash-upon-python-shutdown
achimnol Jan 4, 2026
2ecbc98
test: Use explicit runtime cleanup in subprocess shutdown tests
achimnol Jan 4, 2026
58d29be
refactor: Bundle and patch pyo3-async-runtimes for explicit shutdown
achimnol Jan 4, 2026
9e040d1
ci: Enable submodule checkout in CI workflow
achimnol Jan 4, 2026
44d6ec3
refactor: Switch to forked pyo3-async-runtimes with shutdown API
achimnol Jan 4, 2026
e767118
test: Call _cleanup_runtime() explicitly after event loop
achimnol Jan 5, 2026
c4782cf
refactor: Rename _cleanup_runtime to cleanup_runtime and move call in…
achimnol Jan 5, 2026
cdcb739
refactor: Use tokio's shutdown_timeout() instead of manual task tracking
achimnol Jan 5, 2026
fae768b
fix: Update submodule to remove unused imports
achimnol Jan 5, 2026
84cf34e
chore: Update pyo3-async-runtimes to valkey-glide pattern
achimnol Jan 5, 2026
49328ff
fix: Ensure synchronous shutdown with shutdown_timeout
achimnol Jan 5, 2026
e53111f
chore: Update pyo3-async-runtimes submodule
achimnol Jan 5, 2026
33c899b
refactor: Simplify code by removing EtcdRt wrapper
achimnol Jan 5, 2026
d4026f8
refactor: Import future_into_py for cleaner code
achimnol Jan 5, 2026
65ee436
chore: Remove outdated documentation and patch files
achimnol Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
steps:
- name: Checkout the revision
uses: actions/checkout@v4
with:
submodules: recursive
- name: Set up Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "vendor/pyo3-async-runtimes"]
path = vendor/pyo3-async-runtimes
url = https://github.com/lablup/pyo3-async-runtimes.git
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ crate-type = ["cdylib"]
[dependencies]
etcd-client = "0.16.1"
pyo3 = { version = "0.27.2", features = ["extension-module", "multiple-pymethods"] }
pyo3-async-runtimes = { version = "0.27.0", features = ["attributes", "tokio-runtime"] }
# Using patched version with explicit shutdown API
pyo3-async-runtimes = { path = "vendor/pyo3-async-runtimes", features = ["attributes", "tokio-runtime"] }
scopeguard = "1.2.0"
tokio = { version = "1.46.1", features = ["sync"] }
tokio-stream = "0.1.17"
Expand Down
29 changes: 29 additions & 0 deletions etcd_client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,32 @@ class GRPCStatusCode(Enum):

Unauthenticated = 16
"""The request does not have valid authentication credentials."""


def cleanup_runtime() -> None:
"""
Explicitly cleanup the tokio runtime.

This function signals the runtime to shutdown and waits for all tracked tasks
to complete. It should be called at the end of your main async function,
before the event loop shuts down.

Example:
```python
from etcd_client import cleanup_runtime

async def main():
# Your etcd operations here
client = Client.connect(["localhost:2379"])
await client.put("key", "value")
# Cleanup before returning
cleanup_runtime()

asyncio.run(main())
```

Note:
This is useful for ensuring clean shutdown and preventing GIL state
violations during Python interpreter finalization.
"""
...
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ dev = [
requires = ["maturin>=1.7,<2.0"]
build-backend = "maturin"

[tool.maturin]
python-source = "python"

# === Tool Configuration ===

[tool.ruff]
Expand Down
1 change: 1 addition & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.so
6 changes: 6 additions & 0 deletions python/etcd_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .etcd_client import * # noqa: F403
from .etcd_client import cleanup_runtime # noqa: F401

__doc__ = etcd_client.__doc__ # noqa: F405
if hasattr(etcd_client, "__all__"): # noqa: F405
__all__ = etcd_client.__all__ # noqa: F405
23 changes: 23 additions & 0 deletions scripts/run-etcd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
ETCD_VER=v3.5.14

rm -rf /tmp/etcd-data.tmp && mkdir -p /tmp/etcd-data.tmp && \
docker run -d \
-p 2379:2379 \
-p 2380:2380 \
--mount type=bind,source=/tmp/etcd-data.tmp,destination=/etcd-data \
--name etcd-gcr-${ETCD_VER} \
gcr.io/etcd-development/etcd:${ETCD_VER} \
/usr/local/bin/etcd \
--name s1 \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://0.0.0.0:2379 \
--listen-peer-urls http://0.0.0.0:2380 \
--initial-advertise-peer-urls http://0.0.0.0:2380 \
--initial-cluster s1=http://0.0.0.0:2380 \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr

1 change: 1 addition & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl PyClient {
result
}

#[pyo3(signature = ())]
fn __aenter__<'a>(&'a mut self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
let endpoints = self.endpoints.clone();
let connect_options = self.connect_options.clone();
Expand Down
1 change: 0 additions & 1 deletion src/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ impl PyCommunicator {
})
}

// TODO: Implement and use the response types of `lease` type's methods
fn lease_grant<'a>(&'a self, py: Python<'a>, ttl: i64) -> PyResult<Bound<'a, PyAny>> {
let client = self.0.clone();
future_into_py(py, async move {
Expand Down
4 changes: 2 additions & 2 deletions src/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl PyCondVar {
while !*condition.lock().await {
inner.notified().await;
}
Ok(())
Ok::<(), PyErr>(())
})
}

Expand All @@ -37,7 +37,7 @@ impl PyCondVar {
future_into_py(py, async move {
*condition.lock().await = true;
inner.notify_waiters();
Ok(())
Ok::<(), PyErr>(())
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod compare;
mod condvar;
mod error;
mod lock_manager;
mod runtime;
mod txn;
mod txn_response;
mod watch;
Expand Down Expand Up @@ -70,6 +71,10 @@ mod etcd_client {
py.get_type::<InvalidHeaderValueError>(),
)?;
m.add("EndpointError", py.get_type::<EndpointError>())?;

// Add runtime cleanup function
m.add_function(wrap_pyfunction!(crate::runtime::cleanup_runtime, m)?)?;

Ok(())
}
}
25 changes: 25 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use pyo3::prelude::*;

/// Request graceful shutdown of the tokio runtime.
///
/// This should be called at the end of your main async function, before the event loop shuts down:
///
/// ```python
/// import asyncio
/// from etcd_client import cleanup_runtime
///
/// async def main():
/// # Your etcd operations here
/// ...
/// # Cleanup before returning
/// cleanup_runtime()
///
/// asyncio.run(main())
/// ```
///
/// This function uses tokio's `shutdown_timeout()` to gracefully shut down all tasks,
/// waiting up to 5 seconds for pending tasks to complete.
#[pyfunction]
pub fn cleanup_runtime() {
pyo3_async_runtimes::tokio::request_shutdown(5000);
}
43 changes: 22 additions & 21 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,48 @@ def etcd_container():
with DockerContainer(
f"gcr.io/etcd-development/etcd:{ETCD_VER}",
command=_etcd_command,
).with_bind_ports("2379/tcp", 2379) as container:
).with_exposed_ports(2379) as container:
wait_for_logs(container, "ready to serve client requests")
yield
yield container


@pytest.fixture
async def etcd(etcd_container):
etcd_port = etcd_container.get_exposed_port(2379)
etcd = AsyncEtcd(
addr=HostPortPair(host="127.0.0.1", port=2379),
addr=HostPortPair(host="127.0.0.1", port=etcd_port),
namespace="test",
scope_prefix_map={
ConfigScopes.GLOBAL: "global",
ConfigScopes.SGROUP: "sgroup/testing",
ConfigScopes.NODE: "node/i-test",
},
)
try:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
await etcd.delete_prefix("", scope=ConfigScopes.SGROUP)
await etcd.delete_prefix("", scope=ConfigScopes.NODE)
yield etcd
finally:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
await etcd.delete_prefix("", scope=ConfigScopes.SGROUP)
await etcd.delete_prefix("", scope=ConfigScopes.NODE)
await etcd.close()
del etcd
async with etcd:
try:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
await etcd.delete_prefix("", scope=ConfigScopes.SGROUP)
await etcd.delete_prefix("", scope=ConfigScopes.NODE)
yield etcd
finally:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
await etcd.delete_prefix("", scope=ConfigScopes.SGROUP)
await etcd.delete_prefix("", scope=ConfigScopes.NODE)


@pytest.fixture
async def gateway_etcd(etcd_container):
etcd_port = etcd_container.get_exposed_port(2379)
etcd = AsyncEtcd(
addr=HostPortPair(host="127.0.0.1", port=2379),
addr=HostPortPair(host="127.0.0.1", port=etcd_port),
namespace="test",
scope_prefix_map={
ConfigScopes.GLOBAL: "",
},
)
try:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
yield etcd
finally:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
del etcd
async with etcd:
try:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
yield etcd
finally:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
20 changes: 18 additions & 2 deletions tests/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import functools
import logging
from collections import ChainMap, namedtuple
from types import TracebackType
from typing import (
Any,
AsyncGenerator,
Expand Down Expand Up @@ -165,8 +166,23 @@ def __init__(
connect_options=self._connect_options,
)

async def close(self):
pass # for backward compatibility
async def open(self) -> None:
await self.etcd.__aenter__()

async def close(self) -> None:
await self.etcd.__aexit__(None, None, None)

async def __aenter__(self) -> "AsyncEtcd":
await self.etcd.__aenter__()
return self

async def __aexit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
await self.etcd.__aexit__(exc_type, exc_val, exc_tb)

def _mangle_key(self, k: str) -> str:
if k.startswith("/"):
Expand Down
73 changes: 73 additions & 0 deletions tests/test.py → tests/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,79 @@ async def _record_prefix():
assert records_prefix[3].value == ""


@pytest.mark.asyncio
async def test_subprocess_segfault_reproduction(etcd_container) -> None:
"""Test case to reproduce segfault when subprocess terminates quickly."""
import subprocess
import sys
import tempfile
import os
from pathlib import Path

# Create a script that will be run in subprocess
script_content = """
import asyncio
import sys

from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair

async def main(etcd_port):
etcd = AsyncEtcd(
addr=HostPortPair(host="127.0.0.1", port=etcd_port),
namespace="test_subprocess",
scope_prefix_map={
ConfigScopes.GLOBAL: "global",
},
)

# Write a key and immediately exit
async with etcd:
await etcd.put("test_key", "test_value")

if __name__ == "__main__":
etcd_port = int(sys.argv[1])
asyncio.run(main(etcd_port))
"""
etcd_port = etcd_container.get_exposed_port(2379)

# Write the script to a temporary file
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(script_content)
script_path = f.name

# Get project root directory (parent of tests directory)
project_root = str(Path(__file__).parent.parent.resolve())

# Set up environment with PYTHONPATH
env = os.environ.copy()
env["PYTHONPATH"] = project_root

try:
# Run the subprocess 5 times to reproduce the segfault
for i in range(5):
result = subprocess.run(
[sys.executable, "-u", script_path, str(etcd_port)],
capture_output=True,
text=True,
timeout=10,
env=env,
)

# Check if the subprocess completed successfully
if result.returncode != 0:
print(f"Subprocess {i + 1} failed with return code {result.returncode}")
print(f"stderr: {result.stderr}")
print(f"stdout: {result.stdout}")

assert result.returncode == 0, (
f"Subprocess {i + 1} failed with return code {result.returncode}"
)

finally:
# Clean up the temporary script file
os.unlink(script_path)


@pytest.mark.asyncio
async def test_watch_once(etcd: AsyncEtcd) -> None:
records = []
Expand Down
Loading