Skip to content

Commit

Permalink
Ensure Locks always register with scheduler (#8781)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jul 19, 2024
1 parent 31cb89c commit 70ae414
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 17 deletions.
6 changes: 3 additions & 3 deletions distributed/lock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import uuid

from distributed.semaphore import Semaphore

Expand Down Expand Up @@ -51,7 +52,6 @@ def __init__(
self,
name=None,
client=_no_value,
register=True,
scheduler_rpc=None,
loop=None,
):
Expand All @@ -64,10 +64,10 @@ def __init__(
stacklevel=2,
)

self.name = name or "lock-" + uuid.uuid4().hex
super().__init__(
max_leases=1,
name=name,
register=register,
scheduler_rpc=scheduler_rpc,
loop=loop,
)
Expand Down Expand Up @@ -112,4 +112,4 @@ def __getstate__(self):
return self.name

def __setstate__(self, state):
self.__init__(name=state, register=False)
self.__init__(name=state)
12 changes: 5 additions & 7 deletions distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def __init__(
self,
max_leases=1,
name=None,
register=True,
scheduler_rpc=None,
loop=None,
):
Expand All @@ -344,9 +343,7 @@ def __init__(

self.refresh_leases = True

self._do_register = None
if register:
self._do_register = register
self._registered = False

# this should give ample time to refresh without introducing another
# config parameter since this *must* be smaller than the timeout anyhow
Expand Down Expand Up @@ -403,6 +400,8 @@ def _verify_running(self):
)

async def _register(self):
if self._registered:
return
lease_timeout = dask.config.get("distributed.scheduler.locks.lease-timeout")

if lease_timeout == "inf":
Expand All @@ -416,14 +415,14 @@ async def _register(self):
lease_timeout=lease_timeout,
operation=f"semaphore register id={self.id} name={self.name}",
)
self._registered = True

def register(self, **kwargs):
return self.sync(self._register)

def __await__(self):
async def create_semaphore():
if self._do_register:
await self._register()
await self._register()
return self

return create_semaphore().__await__()
Expand Down Expand Up @@ -558,7 +557,6 @@ def __setstate__(self, state):
self.__init__(
name=name,
max_leases=max_leases,
register=False,
)

def close(self):
Expand Down
18 changes: 18 additions & 0 deletions distributed/tests/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,24 @@ def f(x, lock=None):
assert lock2.name == lock.name


@gen_cluster(client=True)
async def test_serializable_no_ctx(c, s, a, b):
def f(x, lock=None):
lock.acquire()
try:
assert lock.name == "x"
return x + 1
finally:
lock.release()

lock = Lock("x")
futures = c.map(f, range(10), lock=lock)
await c.gather(futures)

lock2 = pickle.loads(pickle.dumps(lock))
assert lock2.name == lock.name


@gen_cluster(client=True, nthreads=[])
async def test_locks(c, s):
async with Lock("x") as l1:
Expand Down
9 changes: 2 additions & 7 deletions distributed/tests/test_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,8 @@ async def test_close_async(c, s, a):
match="Closing semaphore test but there remain unreleased leases .*",
):
await sem.close()
# After close, the semaphore is reset
await sem.acquire()
with pytest.warns(
RuntimeWarning,
match="Closing semaphore test but there remain unreleased leases .*",
):
await sem.close()
with pytest.raises(RuntimeError, match="not known"):
await sem.acquire()

sem2 = await Semaphore(name="t2", max_leases=1)
assert await sem2.acquire()
Expand Down

0 comments on commit 70ae414

Please sign in to comment.