Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create BaseSemaphore class instead of using TaskSemaphore #252

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions s3transfer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,44 +605,53 @@ class NoResourcesAvailable(Exception):
pass


class TaskSemaphore:
def __init__(self, count):
"""A semaphore for the purpose of limiting the number of tasks

:param count: The size of semaphore
"""
self._semaphore = threading.Semaphore(count)
class BaseSemaphore:
"""Base class for semaphores"""

def acquire(self, tag, blocking=True):
"""Acquire the semaphore

:param tag: A tag identifying what is acquiring the semaphore. Note
that this is not really needed to directly use this class but is
needed for API compatibility with the SlidingWindowSemaphore
implementation.
:param block: If True, block until it can be acquired. If False,
:param tag: A tag identifying what is acquiring the semaphore. Needed
for API compatibility with the SlidingWindowSemaphore implementation.
:param blocking: If True, block until it can be acquired. If False,
do not block and raise an exception if cannot be acquired.

:returns: A token (can be None) to use when releasing the semaphore
"""
logger.debug("Acquiring %s", tag)
if not self._semaphore.acquire(blocking):
raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
raise NotImplementedError("must implement acquire()")

def release(self, tag, acquire_token):
"""Release the semaphore

:param tag: A tag identifying what is releasing the semaphore
:param acquire_token: The token returned from when the semaphore was
acquired. Note that this is not really needed to directly use this
class but is needed for API compatibility with the
:param acquire_token: The token returned from when the semaphore was
acquired. Needed for API compatibility with the
SlidingWindowSemaphore implementation.
"""
raise NotImplementedError("must implement release()")


class TaskSemaphore(BaseSemaphore):
"""A wrapper around a simple semaphore"""

def __init__(self, count):
"""A semaphore for the purpose of limiting the number of tasks

:param count: The size of semaphore
"""
self._semaphore = threading.Semaphore(count)

def acquire(self, tag, blocking=True):
logger.debug("Acquiring %s", tag)
if not self._semaphore.acquire(blocking):
raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)

def release(self, tag, acquire_token):
logger.debug(f"Releasing acquire {tag}/{acquire_token}")
self._semaphore.release()


class SlidingWindowSemaphore(TaskSemaphore):
class SlidingWindowSemaphore(BaseSemaphore):
"""A semaphore used to coordinate sequential resource access.

This class is similar to the stdlib BoundedSemaphore:
Expand All @@ -660,7 +669,7 @@ class SlidingWindowSemaphore(TaskSemaphore):
this semaphore can also enforce that you only have a max range of
10 at any given point in time. You must also specify a tag name
when you acquire the semaphore. The sliding window semantics apply
on a per tag basis. The internal count will only be incremented
on a per-tag basis. The internal count will only be incremented
when the minimum sequence number for a tag is released.

"""
Expand All @@ -676,6 +685,7 @@ def __init__(self, count):
self._pending_release = {}

def current_count(self):
"""Current semaphore count"""
with self._lock:
return self._count

Expand Down