Skip to content

Commit

Permalink
Round Robin oracle and eDSL (#2177)
Browse files Browse the repository at this point in the history
This PR makes progress towards #1810. It implements the python oracle
for PIFOs generalized to n flows, now known as Round Robin queues. Just
as with the PIFO, if a flow falls silent, the remaining flows will take
their turns. That flow effectively skips its turn. To re-generate the
test files with 20000 commands and a max length of 16, type in the
command line after navigating to the directory calyx/calyx-py/calyx

```
./gen_queue_data_expect.sh
```

Additionally, this PR also implements the Calyx version of Round Robin
queues in rr_queue.py. This was originally supposed to be its own PR,
but I thought it might be more complicated if I branched off a branch.
To run these tests, type in the command line
```
runt -i "rr_queue"
```

---------

Co-authored-by: Cassandra Nicole Sziklai <[email protected]>
Co-authored-by: Anshuman Mohan <[email protected]>
Co-authored-by: Anshuman Mohan <[email protected]>
  • Loading branch information
4 people authored Jul 2, 2024
1 parent ecf05cc commit ba0d5f0
Show file tree
Hide file tree
Showing 24 changed files with 720,651 additions and 5 deletions.
8 changes: 8 additions & 0 deletions calyx-py/calyx/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,14 @@ def le_store_in_reg(
cell = self.le(width, cellname, signed)
return self.op_store_in_reg(cell, left, right, cell.name, 1, ans_reg)

def ge_store_in_reg(
self, left, right, ans_reg=None, cellname=None, width=None, signed=False
):
"""Inserts wiring into `self` to perform `reg := left >= right`."""
width = width or self.try_infer_width(width, left, right)
cell = self.ge(width, cellname, signed)
return self.op_store_in_reg(cell, left, right, cell.name, 1, ans_reg)

def mult_store_in_reg(
self, left, right, ans_reg=None, cellname=None, width=None, signed=False
):
Expand Down
8 changes: 8 additions & 0 deletions calyx-py/calyx/gen_queue_data_expect.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ done
# use binheap_oracle to generate the expected output
python3 queue_data_gen.py $num_cmds --use-rank > ../test/correctness/queues/binheap/stable_binheap.data
cat ../test/correctness/queues/binheap/stable_binheap.data | python3 binheap_oracle.py $num_cmds $queue_size --keepgoing > ../test/correctness/queues/binheap/stable_binheap.expect

# For the Round Robin queues, we drop piezo mode as well and use rrqueue_oracle to
# generate the expected output for queues with 2..7 flows. This generates 6 data expect file pairs.

for n in {2..7}; do
python3 queue_data_gen.py $num_cmds > ../test/correctness/queues/rr_queues/rr_queue_${n}flows.data
cat ../test/correctness/queues/rr_queues/rr_queue_${n}flows.data | python3 rrqueue_oracle.py $num_cmds $queue_size $n --keepgoing > ../test/correctness/queues/rr_queues/rr_queue_${n}flows.expect
done
99 changes: 95 additions & 4 deletions calyx-py/calyx/queues.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Any, List, Optional
from typing import Any, Optional
import heapq


Expand Down Expand Up @@ -152,7 +152,7 @@ def __len__(self) -> int:
@dataclass(order=True)
class RankValue:
priority: int
value: Any=field(compare=False)
value: Any = field(compare=False)


@dataclass
Expand Down Expand Up @@ -192,13 +192,104 @@ def __len__(self) -> int:
return self.len


@dataclass
class RRQueue:
"""
This is a version of a PIFO generalized to `n` flows, with a work conserving
round robin policy. If a flow is silent when it is its turn, that flow
simply skips its turn and the next flow is offered service.
Supports the operations `push`, `pop`, and `peek`.
It takes in a list `boundaries` that must be of length `n`, using which the
client can divide the incoming traffic into `n` flows.
For example, if n = 3 and the client passes boundaries [133, 266, 400],
packets will be divided into three flows: [0, 133], [134, 266], [267, 400].
- At push, we check the `boundaries` list to determine which flow to push to.
Take the boundaries example given earlier, [133, 266, 400].
If we push the value 89, it will end up in flow 0 becuase 89 <= 133,
and 305 would end up in flow 2 since 266 <= 305 <= 400.
- Pop first tries to pop from `hot`. If this succeeds, great. If it fails,
it increments `hot` and therefore continues to check all other flows
in round robin fashion.
- Peek allows the client to see which element is at the head of the queue
without removing it. Thus, peek works in a similar fashion to `pop`, except
`hot` is restored to its original value at the every end.
Further, nothing is actually dequeued.
"""

def __init__(self, n, boundaries, max_len: int):
self.hot = 0
self.n = n
self.pifo_len = 0
self.boundaries = boundaries
self.data = [Fifo(max_len) for _ in range(n)]

self.max_len = max_len
assert (
self.pifo_len <= self.max_len
) # We can't be initialized with a PIFO that is too long.

def push(self, val: int):
"""Pushes `val` to the PIFO."""
if self.pifo_len == self.max_len:
raise QueueError("Cannot push to full PIFO.")
for fifo, boundary in zip(self.data, self.boundaries):
if val <= boundary:
fifo.push(val)
self.pifo_len += 1
break

def increment_hot(self):
"""Increments `hot`, taking into account wraparound."""
self.hot = 0 if self.hot == (self.n - 1) else self.hot + 1

def pop(self) -> Optional[int]:
"""Pops the PIFO by popping some internal FIFO.
Updates `hot` to be one more than the index of the internal FIFO that
we did pop.
"""
if self.pifo_len == 0:
raise QueueError("Cannot pop from empty PIFO.")

while True:
try:
val = self.data[self.hot].pop()
if val is not None:
self.increment_hot()
self.pifo_len -= 1
return val
self.increment_hot()
except QueueError:
self.increment_hot()

def peek(self) -> Optional[int]:
"""Peeks into the PIFO. Does not affect what `hot` is."""
if self.pifo_len == 0:
raise QueueError("Cannot peek into empty PIFO.")

original_hot = self.hot
while True:
try:
val = self.data[self.hot].peek()
if val is not None:
self.hot = original_hot
return val
self.increment_hot()
except QueueError:
self.increment_hot()

def __len__(self) -> int:
return self.pifo_len


def operate_queue(queue, max_cmds, commands, values, ranks=None, keepgoing=False):
"""Given the two lists, one of commands and one of values.
Feed these into our queue, and return the answer memory.
"""

ans = []
ranks_or_values = values if ranks == None else ranks
ranks_or_values = values if ranks is None else ranks
for cmd, val, rnk in zip(commands, values, ranks_or_values):
if cmd == 0:
try:
Expand All @@ -218,7 +309,7 @@ def operate_queue(queue, max_cmds, commands, values, ranks=None, keepgoing=False

elif cmd == 2:
try:
if ranks == None:
if ranks is None:
queue.push(val)
else:
queue.push(rnk, val)
Expand Down
35 changes: 35 additions & 0 deletions calyx-py/calyx/rrqueue_oracle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# For usage, see gen_queue_data_expect.sh
import sys
import calyx.queues as queues
from calyx import queue_util

if __name__ == "__main__":
num_cmds, len, numflows = int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3])
keepgoing = "--keepgoing" in sys.argv
commands, values, _ = queue_util.parse_json()

# In reality, we would allow the user to specify the boundaries via
# command line arguments or a configuration file. For now, we hardcode them
# as a function of the number of flows.
if numflows == 2:
boundaries = [200, 400]
elif numflows == 3:
boundaries = [133, 266, 400]
elif numflows == 4:
boundaries = [100, 200, 300, 400]
elif numflows == 5:
boundaries = [80, 160, 240, 320, 400]
elif numflows == 6:
boundaries = [66, 100, 200, 220, 300, 400]
elif numflows == 7:
boundaries = [50, 100, 150, 200, 250, 300, 400]
else:
raise ValueError("Unsupported number of flows")

# Our Round Robin Queue orchestrates n FIFOs, in this case provided as
# a command line argument. It orchestrates the FIFOs in a round-robin fashion.
pifo = queues.RRQueue(numflows, boundaries, len)

ans = queues.operate_queue(pifo, num_cmds, commands, values, keepgoing=keepgoing)

queue_util.dump_json(ans, commands, values)
Loading

0 comments on commit ba0d5f0

Please sign in to comment.