Skip to content

Commit

Permalink
Tidy queue code (#2261)
Browse files Browse the repository at this point in the history
This PR constitutes a full-court press, tidying up queue-related code
all around.

Changes:
- Use `case` more.
- Remove instances of `peek`.
- Remove the file `...queues/pifo.py`, which has been subsumed by
`strict_and_rr_queues.py`.
- Black pass.
- Whitespace fixes not addressed by Black.
  • Loading branch information
anshumanmohan authored Aug 21, 2024
1 parent c38598c commit 8d3708d
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 436 deletions.
22 changes: 14 additions & 8 deletions calyx-py/calyx/queue_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def insert_runner(prog, queue, name, num_cmds, use_ranks, stats_component=None):
# `1`: push
# - input `value`
# which is a 32-bit unsigned integer. If `cmd` is `1`, push this value.
# - ref register `ans`, into which the result of a pop or peek is written.
# - ref register `ans`, into which the result of a pop is written.
# - ref register `err`, which is raised if an error occurs.

# Our memories and registers, all of which are passed to us by reference.
Expand Down Expand Up @@ -223,17 +223,23 @@ def insert_main(
# If the dataplane component has an answer,
# write it to the answer-list and increment the index `i`.
cb.if_(
has_ans.out, # an answer exists, so we'll store it to `ans_mem` in the next line
has_ans.out, # an answer exists, so we'll store it to `ans_mem` in the next line
main.mem_store_d1(ans_mem, i.out, dataplane_ans.out, "write_ans"),
cb.if_(
dataplane_err.out, # there was an error
dataplane_err.out, # there was an error
main.mem_store_d1(
ans_mem, i.out, cb.const(32, 4294967295), "write_err" # store the value 2^32 - 1 (code for error) to `ans_mem`
),
main.mem_store_d1( # if we're here, we must be here because we were a successful push.
ans_mem, i.out, cb.const(32, 4294967294), "write_push" # store the value 2^32 - 2 (code for push) to `ans_mem`
ans_mem,
i.out,
cb.const(32, 4294967295),
"write_err", # store the value 2^32 - 1 (code for error) to `ans_mem`
),
),
main.mem_store_d1( # if we're here, we must be here because we were a successful push.
ans_mem,
i.out,
cb.const(32, 4294967294),
"write_push", # store the value 2^32 - 2 (code for push) to `ans_mem`
),
),
),
(
cb.invoke( # Invoke the controller component.
Expand Down
22 changes: 11 additions & 11 deletions calyx-py/calyx/queue_data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,31 @@ def no_err_cmds_list(queue_size, num_cmds):
commands += (push_goal - total_pop_count) * [0]
break

# If the total number of commands is not `num_cmds`, pad it with `peek`s.
# If the total number of commands is not `num_cmds`, pad it with `pop`s.
# This is because the `commands` list must have `num_cmds` items.
commands += (num_cmds - len(commands)) * [1]
# The above command will add either zero or one `peek` command to the end.
commands += (num_cmds - len(commands)) * [0]
# The above command will add either zero or one `pop` commands to the end.

assert (
len(commands) == num_cmds
), f"Length of commands list was {len(commands)}, expected {num_cmds}"
return commands


def dump_json(num_cmds, no_err: bool, queue_size: Optional[int]=None, nwc=False, use_ranks=False):
def dump_json(
num_cmds, no_err: bool, queue_size: Optional[int] = None, nwc=False, use_ranks=False
):
"""Prints a JSON representation of the data to stdout.
The data itself is populated randomly, following certain rules:
- It has three "memories": `commands`, `values`, and `ans_mem`.
- Optional memories `ranks` and `times` are included for queues primed for non-work-conserving algorithms.
- The `commands` memory has `num_cmds` items, which are 0 or 1 for both work-conserving and
- The `commands` memory has `num_cmds` items, which are 0 or 1 for both work-conserving and
non-work-conserving policies. They are as follows:
FOR WORK-CONSERVING POLICIES
0 : pop
1 : push
FOR NON-WORK-CONSERVING POLICIES
0 : pop by predicate
1 : push
Expand All @@ -96,7 +98,7 @@ def dump_json(num_cmds, no_err: bool, queue_size: Optional[int]=None, nwc=False,
- The `ans_mem` memory has `num_cmds` items, all zeroes.
- Each memory has a `format` field, which is a format object for a bitvector.
"""

commands = {
"commands": {
"data": (
Expand All @@ -105,9 +107,7 @@ def dump_json(num_cmds, no_err: bool, queue_size: Optional[int]=None, nwc=False,
if no_err
# If the `no_err` flag is set, then we use the special helper
# that ensures no overflow or overflow will occur.
else (
[random.randint(0, 1) for _ in range(num_cmds)]
)
else ([random.randint(0, 1) for _ in range(num_cmds)])
),
"format": format_gen(1),
}
Expand All @@ -134,7 +134,7 @@ def dump_json(num_cmds, no_err: bool, queue_size: Optional[int]=None, nwc=False,
# The `times` memory has `num_cmds` items, which are all
# random values between 0 and 50.
"format": format_gen(32),
}
}
}
ans_mem = {
"ans_mem": {
Expand Down
127 changes: 61 additions & 66 deletions calyx-py/calyx/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class QueueError(Exception):
"""An error that occurs when we try to pop/peek from an empty queue,"""
"""An error that occurs when we try to pop from an empty queue,"""


class CmdError(Exception):
Expand Down Expand Up @@ -48,9 +48,9 @@ class Pifo:
Supports the operations `push` and `pop`.
We do this by maintaining two sub-queues that are given to us at initialization.
We toggle between these sub-queues when popping/peeking.
We toggle between these sub-queues when popping.
We have a variable called `hot` that says which sub-queue is to be
popped/peeked next.
popped next.
`hot` starts at 0.
We also take at initialization a `boundary` value.
Expand Down Expand Up @@ -130,14 +130,19 @@ class RankValue:
class NWCSimple:
"""A simple test oracle structure for non-work-conserving queues.
This serves the same purpose as PIEOs and Calendar Queues (see Python implementation below),
representing an abstract implementation for any non-work-conserving, time-dependent structure.
This serves the same purpose as PIEOs and Calendar Queues
(see Python implementation below),
representing an abstract implementation for any non-work-conserving,
time-dependent structure.
In our implementation, we support time-based 'ripeness' predicates,
which check that an element's encoded 'readiness time' is earlier than the specified current time.
which check that an element's encoded 'readiness time' is earlier than
the specified current time.
The term 'ripe', as defined above, takes in an element (which has some encoded readiness time),
and a specified 'current time'. It checks that the element's readiness time is <= the current time.
The term 'ripe', as defined above, takes in an element
(which has some encoded readiness time),
and a specified 'current time'. It checks that the element's readiness
time is <= the current time.
Supports the operations `push` and `pop`.
Expand All @@ -148,18 +153,22 @@ class NWCSimple:
When asked to push:
- If the queue is at length `max_len`, we raise an error.
- Otherwise, we insert the element into the PIEO such that the rank order stays increasing.
- To avoid ties between ranks, we left-shift the rank and then add either a custom buffer,
- Otherwise, we insert the element into the PIEO such that the rank order
stays increasing.
- To avoid ties between ranks, we left-shift the rank and then add either
a custom buffer,
or an internal element count tracker.
When asked to pop:
- If the length of `data` is 0, we raise an error .
- We can either pop based on value or based on eligibility.
- This implementation supports the most common readiness predicate - whether an element is 'ripe',
or time-ready (the inputted time is >= the element's specified readiness time).
- This implementation supports the most common readiness predicate -
whether an element is 'ripe', or time-ready
(the inputted time is >= the element's specified readiness time).
- If a value is passed in, we pop the first (lowest-rank) instance of that value which is 'ripe'.
- If a value is passed in, we pop the first (lowest-rank) instance of that
value which is 'ripe'.
- If no value is passed in but a time is,
we pop the first (lowest-rank) value that passes the predicate.
"""
Expand All @@ -174,7 +183,8 @@ def push(self, val, rank=0, time=0) -> None:
raise QueueError("Overflow")

# Left-shift the rank by 32 and add in the insertion count.
# With every push, we modify the insertion count as to reduce any possible duplicate ranks.
# With every push, we modify the insertion count as to reduce any
# possible duplicate ranks.

heapq.heappush(
self.data, RankValue(((rank << 32) + self.insertion_count), (val, time))
Expand All @@ -194,7 +204,8 @@ def query(self, time=0, val=None) -> Optional[int]:
while len(self.data) > 0:
# Check for eligibility
if self.is_ripe(time) and (val is None or self.data[0].value[0] == val):
# If eligible, we pop the element and push all cached elements back into the heap
# If eligible, we pop the element and push all cached elements
# back into the heap
result = heapq.heappop(self.data)

for elem in temp:
Expand All @@ -205,7 +216,8 @@ def query(self, time=0, val=None) -> Optional[int]:
# After each iteration, pop the current element so we can scan down the heap
temp.append(heapq.heappop(self.data))

# If no eligible elements are found, repopulate the data heap with cached elements
# If no eligible elements are found, repopulate the data heap with
# cached elements
for elem in temp:
heapq.heappush(self.data, elem)
raise QueueError("Underflow")
Expand All @@ -217,16 +229,20 @@ def pop(self, time=0, val=None) -> Optional[int]:
@dataclass
class Pieo:
"""A PIEO data structure.
PIEOs function as generalized PIFOs, but popping and pushing supports the 'extract-out' idea
rather than exclusively a 'first-out' operation. Elements can either be extracted by value
(pass in a value and obtain the lowest-rank element matching it), or by an eligibility predicate
(find the lowest-rank element matching the predicate).
PIEOs function as generalized PIFOs, but popping and pushing supports the
'extract-out' idea rather than exclusively a 'first-out' operation.
Elements can either be extracted by value
(pass in a value and obtain the lowest-rank element matching it), or by an
eligibility predicate (find the lowest-rank element matching the predicate).
In our implementation, we support time-based 'ripeness' predicates,
which check that an element's encoded 'readiness time' is earlier than the specified current time.
which check that an element's encoded 'readiness time' is earlier than the
specified current time.
The term 'ripe', as defined above, takes in an element (which has some encoded readiness time),
and a specified 'current time'. It checks that the element's readiness time is <= the current time.
The term 'ripe', as defined above, takes in an element
(which has some encoded readiness time),
and a specified 'current time'. It checks that the element's readiness time
is <= the current time.
For more info, consult https://dl.acm.org/doi/pdf/10.1145/3341302.3342090.
Expand All @@ -240,18 +256,22 @@ class Pieo:
When asked to push:
- If the PIEO is at length `max_len`, we raise an error.
- Otherwise, we insert the element into the PIEO such that the rank order stays increasing.
- To avoid ties between ranks, we left-shift the rank and then add either a custom buffer,
- Otherwise, we insert the element into the PIEO such that the rank order
stays increasing.
- To avoid ties between ranks, we left-shift the rank and then add either a
custom buffer,
or an internal element count tracker.
When asked to pop:
- If the length of `data` is 0, we raise an error .
- We can either pop based on value or based on eligibility.
- This implementation supports the most common readiness predicate - whether an element is 'ripe',
or time-ready (the inputted time is >= the element's specified readiness time).
- This implementation supports the most common readiness predicate - whether
an element is 'ripe', or time-ready
(the inputted time is >= the element's specified readiness time).
- If a value is passed in, we pop the first (lowest-rank) instance of that value which is 'ripe'.
- If a value is passed in, we pop the first (lowest-rank) instance of that
value which is 'ripe'.
- If no value is passed in but a time is,
we pop the first (lowest-rank) value that passes the predicate.
"""
Expand Down Expand Up @@ -289,15 +309,18 @@ def push(self, val, rank=0, time=0, insertion_count=None) -> None:
Inserts element such that rank ordering is preserved
"""

# Breaks ties and maintains FIFO order (can pass either custom insertion order or use PIEO internal one).
# Left-shifts the rank 32 bits, before adding either a passed in `insertion_count` parameter or the internal one.
# Breaks ties and maintains FIFO order (can pass either custom insertion
# order or use PIEO internal one).
# Left-shifts the rank 32 bits, before adding either a passed in
# `insertion_count` parameter or the internal one.
rank = (rank << 32) + (insertion_count or self.insertion_count)

# If there is no room left in the queue, raise an Overflow error
if len(self.data) == self.max_len:
raise QueueError("Overflow")

# If there are no elements in the queue, or the latest rank is higher than all others, append to the end
# If there are no elements in the queue, or the latest rank is higher
# than all others, append to the end
if len(self.data) == 0 or rank >= self.data[len(self.data) - 1]["rank"]:
self.data.append({"val": val, "time": time, "rank": rank})

Expand Down Expand Up @@ -350,11 +373,6 @@ def pop(self, time=0, val=None, return_rank=False) -> Optional[int]:

return self.query(time, val, True, return_rank)

def peek(self, time=0, val=None, return_rank=False) -> Optional[int]:
"""Peeks a PIEO. See query() for specifics."""

return self.query(time, val, False, return_rank)


@dataclass
class PCQ:
Expand Down Expand Up @@ -451,7 +469,8 @@ def query(self, time=0, val=None) -> Optional[int]:
raise QueueError(str(self.data) + "Underflow")

def pop(self, time=0, val=None) -> Optional[int]:
"""Pops a PCQ. If we iterate through every bucket and can't find a value, raise underflow."""
"""Pops a PCQ. If we iterate through every bucket and can't find a value,
raise underflow."""

return self.query(time, val)

Expand Down Expand Up @@ -494,12 +513,12 @@ class RRQueue:
round robin policy. If a flow is silent when it is its turn, that flow
simply skips its turn and the next flow is offered service.
Supports the operations `push`, `pop`, and `peek`.
Supports the operations `push` and `pop`.
It takes in a list `boundaries` that must be of length `n`, using which the
client can divide the incoming traffic into `n` flows.
For example, if n = 3 and the client passes boundaries [133, 266, 400],
packets will be divided into three flows: [0, 133], [134, 266], [267, 400].
The argument `subqueues` is a list of subqueues, which can be any type of
The argument `subqueues` is a list of subqueues, which can be any type of
queue from this module.
- At push, we check the `boundaries` list to determine which flow to push to.
Expand All @@ -509,10 +528,6 @@ class RRQueue:
- Pop first tries to pop from `hot`. If this succeeds, great. If it fails,
it increments `hot` and therefore continues to check all other flows
in round robin fashion.
- Peek allows the client to see which element is at the head of the queue
without removing it. Thus, peek works in a similar fashion to `pop`, except
`hot` is restored to its original value at the every end.
Further, nothing is actually dequeued.
"""

def __init__(self, n, boundaries, subqueues, max_len: int):
Expand Down Expand Up @@ -568,13 +583,13 @@ def __len__(self) -> int:
class StrictPifo:
"""
This is a version of a PIFO generalized to `n` flows, with a strict policy.
Flows have a strict order of priority, which determines popping and peeking
Flows have a strict order of priority, which determines popping
order. If the highest priority flow is silent when it is its turn, that flow
simply skips its turn and the next flow is offered service. If a higher
priority flow get pushed to in the interim, the next call to pop/peek will
priority flow get pushed to in the interim, the next call to pop will
return from that flow.
Supports the operations `push`, `pop`, and `peek`.
Supports the operations `push`, and `pop`.
It takes in a list `boundaries` that must be of length `n`, using which the
client can divide the incoming traffic into `n` flows.
For example, if n = 3 and the client passes boundaries [133, 266, 400],
Expand All @@ -593,9 +608,6 @@ class StrictPifo:
and 305 would end up in flow 2 since 266 <= 305 <= 400.
- Pop first tries to pop from `order[0]`. If this succeeds, great. If it fails,
it tries `order[1]`, etc.
- Peek allows the client to see which element is at the head of the queue
without removing it. Thus, peek works in a similar fashion to `pop`. Further,
nothing is actually dequeued.
"""

def __init__(self, n, boundaries, order, subqueues, max_len: int):
Expand Down Expand Up @@ -642,23 +654,6 @@ def pop(self, *_):
except QueueError:
self.next_priority()

def peek(self, *_) -> Optional[int]:
"""Peeks into the PIFO."""
if self.pifo_len == 0:
raise QueueError("Cannot peek into empty PIFO.")

original_priority = self.priority
while True:
try:
val = self.data[self.priority].peek()
if val is not None:
self.priority = original_priority
return val
else:
self.next_priority()
except QueueError:
self.next_priority()

def __len__(self) -> int:
return self.pifo_len

Expand Down
Loading

0 comments on commit 8d3708d

Please sign in to comment.