Skip to content

Commit

Permalink
Strict PIFO oracle and eDSL implementation (#2189)
Browse files Browse the repository at this point in the history
This PR ties off the last half of #1810. It implements the python oracle
and Calyx eDSL for strict PIFOs, which are generalized to n flows. Flows
have a strict order of priority, which determines popping and peeking
order. If the highest priority flow is silent when it is its turn, that
flow simply skips its turn and the next flow is offered service. If that
higher priority flow get pushed to in the interim, the next call to
pop/peek will return from that flow. To re-generate the test files with
20000 commands and a max length of 16, type in the command line after
navigating to the directory calyx/calyx-py/calyx
```
./gen_queue_data_expect.sh
```
To run the runt tests on the eDSL implementation, type 
```
runt -i "strict"
```

---------

Co-authored-by: Cassandra Nicole Sziklai <[email protected]>
Co-authored-by: Anshuman Mohan <[email protected]>
Co-authored-by: Anshuman Mohan <[email protected]>
  • Loading branch information
4 people committed Jul 3, 2024
1 parent f59b893 commit 75c5cb8
Show file tree
Hide file tree
Showing 44 changed files with 600,516 additions and 57 deletions.
9 changes: 9 additions & 0 deletions calyx-py/calyx/gen_queue_data_expect.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ for n in {2..7}; do
python3 queue_data_gen.py $num_cmds > ../test/correctness/queues/rr_queues/rr_queue_${n}flows.data
cat ../test/correctness/queues/rr_queues/rr_queue_${n}flows.data | python3 rrqueue_oracle.py $num_cmds $queue_size $n --keepgoing > ../test/correctness/queues/rr_queues/rr_queue_${n}flows.expect
done

# For Strict queues, we use strict_queue_oracle.py to generate the expected output
# for queues with 2..6 flows, each with a different strict ordering. This generates 5
# expect file pairs.

for n in {2..6}; do
python3 queue_data_gen.py $num_cmds > ../test/correctness/queues/strict_queues/strict_${n}flows.data
cat ../test/correctness/queues/strict_queues/strict_${n}flows.data | python3 strict_queue_oracle.py $num_cmds $queue_size $n --keepgoing > ../test/correctness/queues/strict_queues/strict_${n}flows.expect
done
101 changes: 101 additions & 0 deletions calyx-py/calyx/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def peek(self) -> Optional[int]:
def __len__(self) -> int:
return len(self.data)

def __str__(self):
return str(self.data)


@dataclass
class Pifo:
Expand Down Expand Up @@ -283,6 +286,104 @@ def __len__(self) -> int:
return self.pifo_len


@dataclass
class StrictPifo:
"""
This is a version of a PIFO generalized to `n` flows, with a strict policy.
Flows have a strict order of priority, which determines popping and peeking
order. If the highest priority flow is silent when it is its turn, that flow
simply skips its turn and the next flow is offered service. If a higher
priority flow get pushed to in the interim, the next call to pop/peek will
return from that flow.
Supports the operations `push`, `pop`, and `peek`.
It takes in a list `boundaries` that must be of length `n`, using which the
client can divide the incoming traffic into `n` flows.
For example, if n = 3 and the client passes boundaries [133, 266, 400],
packets will be divided into three flows: [0, 133], [134, 266], [267, 400].
It takes a list `order` that must be of length `n`, which specifies the order
of priority of the flows. For example, if n = 3 and the client passes order
[1, 2, 0], flow 1 (packets in range [134, 266]) is first priority, flow 2
(packets in range [267, 400]) is second priority, and flow 0 (packets in range
[0, 133]) is last priority.
- At push, we check the `boundaries` list to determine which flow to push to.
Take the boundaries example given earlier, [133, 266, 400].
If we push the value 89, it will end up in flow 0 becuase 89 <= 133,
and 305 would end up in flow 2 since 266 <= 305 <= 400.
- Pop first tries to pop from `order[0]`. If this succeeds, great. If it fails,
it tries `order[1]`, etc.
- Peek allows the client to see which element is at the head of the queue
without removing it. Thus, peek works in a similar fashion to `pop`. Further,
nothing is actually dequeued.
"""

def __init__(self, n, boundaries, order, max_len: int):
self.order = order
self.priority = 0
self.n = n
self.pifo_len = 0
self.boundaries = boundaries
self.data = [Fifo(max_len) for _ in range(n)]

self.max_len = max_len

def push(self, val: int):
"""Works the same as in RRQueue. Pushes `val` to the PIFO."""
if self.pifo_len == self.max_len:
raise QueueError("Cannot push to full PIFO.")
for b in range(self.n):
if val <= self.boundaries[b]:
idx = self.order.index(b)
self.data[idx].push(val)
self.pifo_len += 1
break

def next_priority(self):
"""Increments priority, taking into account wrap around."""
self.priority = 0 if self.priority == (self.n - 1) else self.priority + 1

def pop(self):
"""Pops the PIFO."""
if self.pifo_len == 0:
raise QueueError("Cannot pop from empty PIFO.")

original_priority = self.priority

while True:
try:
val = self.data[self.priority].pop()
if val is not None:
self.pifo_len -= 1
self.priority = original_priority
return val
else:
self.next_priority()
except QueueError:
self.next_priority()

def peek(self) -> Optional[int]:
"""Peeks into the PIFO."""
if self.pifo_len == 0:
raise QueueError("Cannot peek into empty PIFO.")

original_priority = self.priority
while True:
try:
val = self.data[self.priority].peek()
if val is not None:
self.priority = original_priority
return val
else:
self.next_priority()
except QueueError:
self.next_priority()

def __len__(self) -> int:
return self.pifo_len


def operate_queue(queue, max_cmds, commands, values, ranks=None, keepgoing=False):
"""Given the two lists, one of commands and one of values.
Feed these into our queue, and return the answer memory.
Expand Down
35 changes: 35 additions & 0 deletions calyx-py/calyx/strict_queue_oracle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sys
import calyx.queues as queues
from calyx import queue_util

if __name__ == "__main__":
num_cmds, len, numflows = int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3])
keepgoing = "--keepgoing" in sys.argv
commands, values, _ = queue_util.parse_json()

if numflows == 2:
boundaries = [200, 400]
order = [1, 0]
elif numflows == 3:
boundaries = [133, 266, 400]
order = [1, 2, 0]
elif numflows == 4:
boundaries = [100, 200, 300, 400]
order = [3, 0, 2, 1]
elif numflows == 5:
boundaries = [80, 160, 240, 320, 400]
order = [0, 1, 2, 3, 4]
elif numflows == 6:
boundaries = [66, 100, 200, 220, 300, 400]
order = [3, 1, 5, 2, 4, 0]
else:
raise ValueError("Unsupported number of flows")

# Our Strict queue orchestrates n FIFOs. It takes in a list of
# boundaries of length n, as well as a list `order` which specifies the ranked
# order of the flows.
pifo = queues.StrictPifo(numflows, boundaries, order, len)

ans = queues.operate_queue(pifo, num_cmds, commands, values, keepgoing=keepgoing)

queue_util.dump_json(ans, commands, values)
6 changes: 0 additions & 6 deletions calyx-py/test/correctness/queues/rr_queues/rr_queue_2flows.py

This file was deleted.

6 changes: 0 additions & 6 deletions calyx-py/test/correctness/queues/rr_queues/rr_queue_3flows.py

This file was deleted.

6 changes: 0 additions & 6 deletions calyx-py/test/correctness/queues/rr_queues/rr_queue_4flows.py

This file was deleted.

6 changes: 0 additions & 6 deletions calyx-py/test/correctness/queues/rr_queues/rr_queue_5flows.py

This file was deleted.

6 changes: 0 additions & 6 deletions calyx-py/test/correctness/queues/rr_queues/rr_queue_6flows.py

This file was deleted.

6 changes: 0 additions & 6 deletions calyx-py/test/correctness/queues/rr_queues/rr_queue_7flows.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
# pylint: disable=import-error
"""Common code factored out, to be imported by the different flow implementations."""
# pylint: disable=import-error
import os
import sys
import inspect

# Hackery to import `fifo` from the parent directory.
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)

import fifo
import calyx.builder as cb
import calyx.queue_call as qc

import calyx.py_ast as ast

# This determines the maximum possible length of the queue:
# The max length of the queue will be 2^QUEUE_LEN_FACTOR.
Expand All @@ -36,15 +33,18 @@ def invoke_subqueue(queue_cell, cmd, value, ans, err) -> cb.invoke:
)


def insert_rr_pifo(
prog,
name,
fifos,
boundaries,
numflows,
queue_len_factor=QUEUE_LEN_FACTOR,
def insert_queue(
prog, name, fifos, boundaries, numflows, order, round_robin, queue_len_factor=QUEUE_LEN_FACTOR
):
"""Inserts the component `pifo` into the program."""
"""
Inserts the component `pifo` into the program. If round_robin is true, it
inserts a round robin queue, otherwise it inserts a strict queue. `numflows`
is the number of flows, which must be an integer greater than 0. Boundaries
must be of length `numflows` + 1, where the first boundary is the smallest
number a value can take (eg. 0). `order` is used for strict queues to determine
the order of priority of the subqueues. `order` must be a list of length
`numflows`.
"""

pifo: cb.ComponentBuilder = prog.component(name)
cmd = pifo.input("cmd", 2) # the size in bits is 2
Expand Down Expand Up @@ -105,7 +105,16 @@ def insert_rr_pifo(
invoke_subqueue(fifo_cells[b], cmd, value, ans, err)
# In the specical case when b = 0,
# we don't need to check the lower bound and we can just `invoke`.
if b == 0
if b == 0 and round_robin

else
invoke_subqueue(fifo_cells[order.index(b)], cmd, value, ans, err)

if b == 0 and not round_robin
else cb.if_with(
pifo.gt_use(value, boundaries[b]), # value > boundaries[b]
invoke_subqueue(fifo_cells[order.index(b)], cmd, value, ans, err),)
if not round_robin
# Otherwise, we need to check the lower bound and `invoke`
# only if the value is in the interval.
else cb.if_with(
Expand All @@ -118,7 +127,7 @@ def insert_rr_pifo(
]
invoke_subqueues_value_guard = cb.par(
invoke_subqueues_value_guard_seq
) # Execute the above in parallel.
) # Execute in parallel.

incr_hot_wraparound = cb.if_with(
# If hot = numflows - 1, we need to wrap around to 0. Otherwise, we increment.
Expand All @@ -131,21 +140,25 @@ def insert_rr_pifo(
len_eq_0,
raise_err, # The queue is empty: underflow.
[ # The queue is not empty. Proceed.
copy_hot, # We remember `hot` so we can restore it later.
raise_err, # We raise err so we enter the loop body at least once.
cb.while_with(
err_is_high,
[ # We have entered the loop body because `err` is high.
# Either we are here for the first time,
# or we are here because the previous iteration raised an error
# and incremented `hot` for us.
# We'll try to pop from `fifo_cells[hot]`.
# We'll try to peek from `fifo_cells[hot]`.
# We'll pass it a lowered `err`.
lower_err,
invoke_subqueues_hot_guard,
incr_hot_wraparound, # Increment hot
], # Some pop succeeded. Its answer is our answer.
incr_hot_wraparound, # Increment hot: this will be used
# only if the current subqueue raised an error,
# and another iteration is needed.
],
),
len_decr,
restore_hot if not round_robin else ast.Empty
],
)

Expand Down Expand Up @@ -203,30 +216,37 @@ def insert_rr_pifo(
return pifo


def build(numflows):
def build(numflows, roundrobin):
"""Top-level function to build the program."""

if numflows == 2:
boundaries = [0, 200, 400]
order = [1, 0]
elif numflows == 3:
boundaries = [0, 133, 266, 400]
order = [1, 2, 0]
elif numflows == 4:
boundaries = [0, 100, 200, 300, 400]
order = [3, 0, 2, 1]
elif numflows == 5:
boundaries = [0, 80, 160, 240, 320, 400]
order = [0, 1, 2, 3, 4]
elif numflows == 6:
boundaries = [0, 66, 100, 200, 220, 300, 400]
order = [3, 1, 5, 2, 4, 0]
elif numflows == 7:
boundaries = [0, 50, 100, 150, 200, 250, 300, 400]
order = [0, 1, 2, 3, 4, 5, 6]
else:
raise ValueError("Unsupported number of flows")

num_cmds = int(sys.argv[1])
keepgoing = "--keepgoing" in sys.argv

prog = cb.Builder()
sub_fifos = [
fifo.insert_fifo(prog, f"fifo{i}", QUEUE_LEN_FACTOR) for i in range(numflows)
]
pifo = insert_rr_pifo(prog, "pifo", sub_fifos, boundaries, numflows)
qc.insert_main(prog, pifo, num_cmds)
pifo = insert_queue(prog, "pifo", sub_fifos, boundaries, numflows, order, roundrobin)
qc.insert_main(prog, pifo, num_cmds, keepgoing=keepgoing)
return prog.program
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
import sys
import inspect

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)

from gen_strict_or_rr import build

if __name__ == "__main__":
"""Invoke the top-level function to build the program, with 2 flows."""
build(2, True).emit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
import sys
import inspect

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)

from gen_strict_or_rr import build

if __name__ == "__main__":
"""Invoke the top-level function to build the program, with 3 flows."""
build(3, True).emit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
import sys
import inspect

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)

from gen_strict_or_rr import build

if __name__ == "__main__":
"""Invoke the top-level function to build the program, with 4 flows."""
build(4, True).emit()
Loading

0 comments on commit 75c5cb8

Please sign in to comment.