Skip to content

Commit

Permalink
Queues: randomized testing for binary heaps (#2174)
Browse files Browse the repository at this point in the history
* A few scheduling transactions

- WARNING: the pifo is broken; it doesn't pass the big random test

* Make heaps bigger!

- both pifo and fifo tests pass with large heaps

* Queues all working!

* More docs, fewer magic numbers, simpler control

- slightly refactor control logic in binheap.py
- make rank and value widths arguments to `insert_binheap`
- add docs to stable_binheap, binheap, and fifo. TBD: pifo

* Add pifo docs

* full -> is_full

* Switch to non-piezo tests

* No more magic numbers in gen_queue_data

* Fix error handling logic in binheap and pifo

* Integerate stable_binheap testing with queue_call

* Merge pifo changes from main

* Fix typos

* Neaten a list

---------

Co-authored-by: Anshuman Mohan <[email protected]>
  • Loading branch information
polybeandip and anshumanmohan authored Jun 21, 2024
1 parent 2d75684 commit cd0791c
Show file tree
Hide file tree
Showing 12 changed files with 160,198 additions and 166 deletions.
13 changes: 13 additions & 0 deletions calyx-py/calyx/binheap_oracle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# For usage, see gen_queue_data_expect.sh

import sys
import calyx.queues as queues
from calyx import queue_util

if __name__ == "__main__":
max_cmds, len = int(sys.argv[1]), int(sys.argv[2])
keepgoing = "--keepgoing" in sys.argv
commands, values, ranks = queue_util.parse_json()
binheap = queues.Binheap(len)
ans = queues.operate_queue(binheap, max_cmds, commands, values, ranks=ranks, keepgoing=keepgoing)
queue_util.dump_json(ans, commands, values, ranks=ranks)
6 changes: 3 additions & 3 deletions calyx-py/calyx/fifo_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if __name__ == "__main__":
max_cmds, len = int(sys.argv[1]), int(sys.argv[2])
keepgoing = "--keepgoing" in sys.argv
commands, values = queue_util.parse_json()
commands, values, _ = queue_util.parse_json()
fifo = queues.Fifo(len)
ans = queues.operate_queue(commands, values, fifo, max_cmds, keepgoing=keepgoing)
queue_util.dump_json(commands, values, ans)
ans = queues.operate_queue(fifo, max_cmds, commands, values, keepgoing=keepgoing)
queue_util.dump_json(ans, commands, values)
6 changes: 6 additions & 0 deletions calyx-py/calyx/gen_queue_data_expect.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ for queue_kind in fifo pifo pifo_tree; do
cat ../test/correctness/queues/$queue_kind.data | python3 ${queue_kind}_oracle.py $num_cmds $queue_size --keepgoing > ../test/correctness/queues/$queue_kind.expect
[[ "$queue_kind" != "pifo_tree" ]] && cp ../test/correctness/queues/$queue_kind.expect ../test/correctness/queues/binheap/$queue_kind.expect
done


# For the Binary Heap, we drop piezo mode and enable ranks for data gen and
# use binheap_oracle to generate the expected output
python3 queue_data_gen.py $num_cmds --use-rank > ../test/correctness/queues/binheap/stable_binheap.data
cat ../test/correctness/queues/binheap/stable_binheap.data | python3 binheap_oracle.py $num_cmds $queue_size --keepgoing > ../test/correctness/queues/binheap/stable_binheap.expect
6 changes: 3 additions & 3 deletions calyx-py/calyx/pifo_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
if __name__ == "__main__":
max_cmds, len = int(sys.argv[1]), int(sys.argv[2])
keepgoing = "--keepgoing" in sys.argv
commands, values = queue_util.parse_json()
commands, values, _ = queue_util.parse_json()

# Our PIFO is simple: it just orchestrates two FIFOs. The boundary is 200.
pifo = queues.Pifo(queues.Fifo(len), queues.Fifo(len), 200, len)

ans = queues.operate_queue(commands, values, pifo, max_cmds, keepgoing=keepgoing)
queue_util.dump_json(commands, values, ans)
ans = queues.operate_queue(pifo, max_cmds, commands, values, keepgoing=keepgoing)
queue_util.dump_json(ans, commands, values)
6 changes: 3 additions & 3 deletions calyx-py/calyx/pifo_tree_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

max_cmds, len = int(sys.argv[1]), int(sys.argv[2])
keepgoing = "--keepgoing" in sys.argv
commands, values = queue_util.parse_json()
commands, values, _ = queue_util.parse_json()

# Our PIFO is a little complicated: it is a tree of queues.
# The root has two children, which are PIFOs.
Expand All @@ -28,5 +28,5 @@
len,
)

ans = queues.operate_queue(commands, values, pifo, max_cmds, keepgoing=keepgoing)
queue_util.dump_json(commands, values, ans)
ans = queues.operate_queue(pifo, max_cmds, commands, values, keepgoing=keepgoing)
queue_util.dump_json(ans, commands, values)
84 changes: 66 additions & 18 deletions calyx-py/calyx/queue_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import calyx.builder as cb


def insert_runner(prog, queue, name, num_cmds, stats_component=None):
def insert_runner(prog, queue, name, num_cmds, use_ranks, stats_component=None):
"""Inserts the component `name` into the program.
This will be used to `invoke` the component `queue` and feed it _one command_.
This component is designed to be invoked by some other component, and does not
Expand Down Expand Up @@ -57,17 +57,33 @@ def insert_runner(prog, queue, name, num_cmds, stats_component=None):
# Our memories and registers, all of which are passed to us by reference.
commands = runner.seq_mem_d1("commands", 2, num_cmds, 32, is_ref=True)
values = runner.seq_mem_d1("values", 32, num_cmds, 32, is_ref=True)
ranks = (
runner.seq_mem_d1("ranks", 32, num_cmds, 32, is_ref=True) if use_ranks else None
)
has_ans = runner.reg(1, "has_ans", is_ref=True)
ans = runner.reg(32, "component_ans", is_ref=True)
err = runner.reg(1, "component_err", is_ref=True)

i = runner.reg(32) # The index of the command we're currently processing
cmd = runner.reg(2) # The command we're currently processing
value = runner.reg(32) # The value we're currently processing
rank = runner.reg(32) # The rank we're currently processing

load_data = (
[ # `cmd := commands[i]`, `value := values[i]`, `rank := ranks[i]`
runner.mem_load_d1(commands, i.out, cmd, "write_cmd"),
runner.mem_load_d1(values, i.out, value, "write_value"),
runner.mem_load_d1(ranks, i.out, rank, "write_rank"),
]
if use_ranks
else [ # `cmd := commands[i]`, `value := values[i]`
runner.mem_load_d1(commands, i.out, cmd, "write_cmd"),
runner.mem_load_d1(values, i.out, value, "write_value"),
]
)

runner.control += [
runner.mem_load_d1(commands, i.out, cmd, "write_cmd"), # `cmd := commands[i]`
runner.mem_load_d1(values, i.out, value, "write_value"), # `value := values[i]`
load_data,
(
cb.invoke( # Invoke the queue with a stats component.
queue,
Expand All @@ -78,12 +94,23 @@ def insert_runner(prog, queue, name, num_cmds, stats_component=None):
ref_stats=stats_cell,
)
if stats_component
else cb.invoke( # Invoke the queue without a stats component.
queue,
in_cmd=cmd.out,
in_value=value.out,
ref_ans=ans,
ref_err=err,
else (
cb.invoke( # Invoke the queue with ranks.
queue,
in_cmd=cmd.out,
in_value=value.out,
in_rank=rank.out,
ref_ans=ans,
ref_err=err,
)
if use_ranks
else cb.invoke( # Invoke the queue without stats or ranks.
queue,
in_cmd=cmd.out,
in_value=value.out,
ref_ans=ans,
ref_err=err,
)
)
),
# We're back from the invoke, and it's time for some post-mortem analysis.
Expand All @@ -107,6 +134,7 @@ def insert_main(
queue,
num_cmds,
keepgoing=False,
use_ranks=False,
controller=None,
stats_component=None,
):
Expand All @@ -118,7 +146,9 @@ def insert_main(

stats = main.cell("stats_main", stats_component) if stats_component else None
controller = main.cell("controller", controller) if controller else None
dataplane = insert_runner(prog, queue, "dataplane", num_cmds, stats_component)
dataplane = insert_runner(
prog, queue, "dataplane", num_cmds, use_ranks, stats_component
)
dataplane = main.cell("dataplane", dataplane)

has_ans = main.reg(1)
Expand All @@ -128,6 +158,11 @@ def insert_main(
commands = main.seq_mem_d1("commands", 2, num_cmds, 32, is_external=True)
values = main.seq_mem_d1("values", 32, num_cmds, 32, is_external=True)
ans_mem = main.seq_mem_d1("ans_mem", 32, num_cmds, 32, is_external=True)
ranks = (
main.seq_mem_d1("ranks", 32, num_cmds, 32, is_external=True)
if use_ranks
else None
)
i = main.reg(32) # A counter for how many times we have invoked the dataplane.
j = main.reg(32) # The index on the answer-list we'll write to
keep_looping = main.and_(1) # If this is high, we keep going. Otherwise, we stop.
Expand Down Expand Up @@ -163,14 +198,27 @@ def insert_main(
ref_stats_runner=stats,
)
if stats_component
else cb.invoke(
# Invoke the dataplane component without a stats component.
dataplane,
ref_commands=commands,
ref_values=values,
ref_has_ans=has_ans,
ref_component_ans=dataplane_ans,
ref_component_err=dataplane_err,
else (
cb.invoke(
# Invoke the dataplane component with ranks.
dataplane,
ref_commands=commands,
ref_values=values,
ref_ranks=ranks,
ref_has_ans=has_ans,
ref_component_ans=dataplane_ans,
ref_component_err=dataplane_err,
)
if use_ranks
else cb.invoke(
# Invoke the dataplane component without stats or ranks.
dataplane,
ref_commands=commands,
ref_values=values,
ref_has_ans=has_ans,
ref_component_ans=dataplane_ans,
ref_component_err=dataplane_err,
)
)
),
# If the dataplane component has an answer,
Expand Down
19 changes: 15 additions & 4 deletions calyx-py/calyx/queue_data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def no_err_cmds_list(queue_size, num_cmds):
return commands


def dump_json(num_cmds, no_err: bool, queue_size: Optional[int] = None):
def dump_json(num_cmds, use_rank: bool, no_err: bool, queue_size: Optional[int] = None):
"""Prints a JSON representation of the data to stdout.
The data itself is populated randomly, following certain rules:
- It has three "memories": `commands`, `values`, and `ans_mem`.
Expand Down Expand Up @@ -89,6 +89,14 @@ def dump_json(num_cmds, no_err: bool, queue_size: Optional[int] = None):
"format": format_gen(32),
}
}
ranks = {
"ranks": {
"data": [random.randint(0, 400) for _ in range(num_cmds)],
# The `ranks` memory has `num_cmds` items, which are all
# random values between 0 and 400.
"format": format_gen(32),
}
}
ans_mem = {
"ans_mem": {
"data": [0] * num_cmds,
Expand All @@ -97,15 +105,18 @@ def dump_json(num_cmds, no_err: bool, queue_size: Optional[int] = None):
}
}

print(json.dumps(commands | values | ans_mem, indent=2))

if use_rank:
print(json.dumps(commands | values | ranks | ans_mem, indent=2))
else:
print(json.dumps(commands | values | ans_mem, indent=2))

if __name__ == "__main__":
# Accept a flag that we pass to dump_json.
# This says whether we should use the special no_err helper.
random.seed(5)
num_cmds = int(sys.argv[1])
no_err = "--no-err" in sys.argv
use_rank = "--use-rank" in sys.argv
if no_err:
queue_size = int(sys.argv[3])
dump_json(num_cmds, no_err, queue_size if no_err else None)
dump_json(num_cmds, use_rank, no_err, queue_size if no_err else None)
28 changes: 21 additions & 7 deletions calyx-py/calyx/queue_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,28 @@ def parse_json():
data = json.load(sys.stdin)
commands = data["commands"]["data"]
values = data["values"]["data"]
return commands, values
ranks = None
try:
ranks = data["ranks"]["data"]
except KeyError:
pass
return commands, values, ranks


def dump_json(commands, values, ans_mem):
def dump_json(ans_mem, commands, values, ranks=None):
"""Prints a JSON representation of the data to stdout."""
payload = {
"ans_mem": ans_mem,
"commands": commands,
"values": values,
}
payload = {}
if ranks == None:
payload = {
"ans_mem": ans_mem,
"commands": commands,
"values": values
}
else:
payload = {
"ans_mem": ans_mem,
"commands": commands,
"ranks": ranks,
"values": values
}
print(json.dumps(payload, indent=2))
59 changes: 53 additions & 6 deletions calyx-py/calyx/queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from typing import List
from typing import Optional
from dataclasses import dataclass, field
from typing import Any, List, Optional
import heapq


class QueueError(Exception):
Expand Down Expand Up @@ -149,13 +149,57 @@ def __len__(self) -> int:
return self.pifo_len


def operate_queue(commands, values, queue, max_cmds, keepgoing=False):
@dataclass(order=True)
class RankValue:
priority: int
value: Any=field(compare=False)


@dataclass
class Binheap:
"""A minimum Binary Heap data structure.
Supports the operations `push`, `pop`, and `peek`.
"""

def __init__(self, max_len):
self.heap = []
self.len = 0
self.counter = 0
self.max_len = max_len

def push(self, rnk, val):
"""Pushes `(rnk, val)` to the Binary Heap."""
if self.len == self.max_len:
raise QueueError("Cannot push to full Binary Heap.")
self.counter += 1
self.len += 1
heapq.heappush(self.heap, RankValue((rnk << 32) + self.counter, val))

def pop(self) -> Optional[int]:
"""Pops the Binary Heap."""
if self.len == 0:
raise QueueError("Cannot pop from empty Binary Heap.")
self.len -= 1
return heapq.heappop(self.heap).value

def peek(self) -> Optional[int]:
"""Peeks into the Binary Heap."""
if self.len == 0:
raise QueueError("Cannot peek from empty Binary Heap.")
return self.heap[0].value

def __len__(self) -> int:
return self.len


def operate_queue(queue, max_cmds, commands, values, ranks=None, keepgoing=False):
"""Given the two lists, one of commands and one of values.
Feed these into our queue, and return the answer memory.
"""

ans = []
for cmd, val in zip(commands, values):
ranks_or_values = values if ranks == None else ranks
for cmd, val, rnk in zip(commands, values, ranks_or_values):
if cmd == 0:
try:
ans.append(queue.pop())
Expand All @@ -174,7 +218,10 @@ def operate_queue(commands, values, queue, max_cmds, keepgoing=False):

elif cmd == 2:
try:
queue.push(val)
if ranks == None:
queue.push(val)
else:
queue.push(rnk, val)
except QueueError:
if keepgoing:
continue
Expand Down
Loading

0 comments on commit cd0791c

Please sign in to comment.