Skip to content

Commit

Permalink
csr.event: add event monitor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jean-François Nguyen authored and whitequark committed Jun 25, 2020
1 parent 0385d67 commit 517638c
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 0 deletions.
1 change: 1 addition & 0 deletions nmigen_soc/csr/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .bus import *
from .event import *
106 changes: 106 additions & 0 deletions nmigen_soc/csr/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# nmigen: UnusedElaboratable=no

from nmigen import *

from . import Element, Multiplexer
from .. import event


__all__ = ["EventMonitor"]


class EventMonitor(Elaboratable):
"""Event monitor.
A monitor for subordinate event sources, with a CSR bus interface.
CSR registers
-------------
enable : ``self.src.event_map.size``, read/write
Enabled events. See :meth:`..event.EventMap.sources` for layout.
pending : ``self.src.event_map.size``, read/clear
Pending events. See :meth:`..event.EventMap.sources` for layout.
Parameters
----------
data_width : int
CSR bus data width. See :class:`..csr.Interface`.
alignment : int
CSR address alignment. See :class:`..memory.MemoryMap`.
trigger : :class:`..event.Source.Trigger`
Trigger mode. See :class:`..event.Source`.
"""
def __init__(self, *, data_width, alignment=0, trigger="level"):
choices = ("level", "rise", "fall")
if not isinstance(trigger, event.Source.Trigger) and trigger not in choices:
raise ValueError("Invalid trigger mode {!r}; must be one of {}"
.format(trigger, ", ".join(choices)))

self._trigger = event.Source.Trigger(trigger)
self._map = event.EventMap()
self._monitor = None
self._enable = None
self._pending = None
self._mux = Multiplexer(addr_width=1, data_width=data_width, alignment=alignment)
self._frozen = False

def freeze(self):
"""Freeze the event monitor.
Once the event monitor is frozen, subordinate sources cannot be added anymore.
"""
if self._frozen:
return
self._monitor = event.Monitor(self._map, trigger=self._trigger)
self._enable = Element(self._map.size, "rw")
self._pending = Element(self._map.size, "rw")
self._mux.add(self._enable, extend=True)
self._mux.add(self._pending, extend=True)
self._frozen = True

@property
def src(self):
"""Event source.
Return value
------------
An :class:`..event.Source`. Its input line is asserted by the monitor when a subordinate
event is enabled and pending.
"""
self.freeze()
return self._monitor.src

@property
def bus(self):
"""CSR bus interface.
Return value
------------
A :class:`..csr.Interface` providing access to registers.
"""
self.freeze()
return self._mux.bus

def add(self, src):
"""Add a subordinate event source.
See :meth:`..event.EventMap.add` for details.
"""
self._map.add(src)

def elaborate(self, platform):
self.freeze()

m = Module()
m.submodules.monitor = self._monitor
m.submodules.mux = self._mux

with m.If(self._enable.w_stb):
m.d.sync += self._monitor.enable.eq(self._enable.w_data)
m.d.comb += self._enable.r_data.eq(self._monitor.enable)

with m.If(self._pending.w_stb):
m.d.comb += self._monitor.clear.eq(self._pending.w_data)
m.d.comb += self._pending.r_data.eq(self._monitor.pending)

return m
146 changes: 146 additions & 0 deletions nmigen_soc/test/test_csr_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# nmigen: UnusedElaboratable=no

import unittest
from nmigen import *
from nmigen.back.pysim import *

from ..csr import *
from .. import event


def simulation_test(dut, process):
with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
sim.add_clock(1e-6)
sim.add_sync_process(process)
sim.run()


class EventMonitorTestCase(unittest.TestCase):
def test_params(self):
monitor = EventMonitor(data_width=16, alignment=4, trigger="rise")
self.assertEqual(monitor.bus.data_width, 16)
self.assertEqual(monitor.bus.memory_map.alignment, 4)
self.assertEqual(monitor.src.trigger, event.Source.Trigger.RISE)

def test_trigger_wrong(self):
with self.assertRaisesRegex(ValueError,
r"Invalid trigger mode 'foo'; must be one of level, rise, fall"):
EventMonitor(data_width=8, trigger="foo")

def test_add(self):
monitor = EventMonitor(data_width=8)
sub = event.Source()
monitor.add(sub)
self.assertEqual(monitor.src.event_map.size, 1)
self.assertEqual(monitor.src.event_map.index(sub), 0)

def test_freeze(self):
monitor = EventMonitor(data_width=8)
monitor.freeze()
sub = event.Source()
with self.assertRaisesRegex(ValueError,
r"Event map has been frozen. Cannot add source."):
monitor.add(sub)

def test_src_freeze(self):
monitor = EventMonitor(data_width=8)
monitor.src
sub = event.Source()
with self.assertRaisesRegex(ValueError,
r"Event map has been frozen. Cannot add source."):
monitor.add(sub)

def test_bus_freeze(self):
monitor = EventMonitor(data_width=8)
monitor.bus
sub = event.Source()
with self.assertRaisesRegex(ValueError,
r"Event map has been frozen. Cannot add source."):
monitor.add(sub)

def test_csr_regs(self):
monitor = EventMonitor(data_width=8)
sub_0 = event.Source()
sub_1 = event.Source()
monitor.add(sub_0)
monitor.add(sub_1)
resources = list(monitor.bus.memory_map.resources())
self.assertEqual(len(resources), 2)
enable, enable_range = resources[0]
pending, pending_range = resources[1]
self.assertEqual(
(enable.width, enable.access, enable_range),
(2, Element.Access.RW, (0, 1))
)
self.assertEqual(
(pending.width, pending.access, pending_range),
(2, Element.Access.RW, (1, 2))
)

def test_freeze_idempotent(self):
monitor = EventMonitor(data_width=8)
src = monitor.src
bus = monitor.bus
monitor.freeze()
self.assertIs(src, monitor.src)
self.assertIs(bus, monitor.bus)


class EventMonitorSimulationTestCase(unittest.TestCase):
def test_simple(self):
dut = EventMonitor(data_width=8)
sub = event.Source()
dut.add(sub)

addr_enable = 0x0
addr_pending = 0x1

def process():
yield sub.i.eq(1)
yield Delay()
self.assertEqual((yield sub.trg), 1)
self.assertEqual((yield dut.src.i), 0)

yield dut.bus.addr.eq(addr_enable)
yield dut.bus.r_stb.eq(1)
yield
yield dut.bus.r_stb.eq(0)
yield
self.assertEqual((yield dut.bus.r_data), 0b0)
yield

yield dut.bus.addr.eq(addr_enable)
yield dut.bus.w_stb.eq(1)
yield dut.bus.w_data.eq(0b1)
yield
yield dut.bus.w_stb.eq(0)
yield; yield Delay()

self.assertEqual((yield dut.src.i), 1)
yield sub.i.eq(0)
yield Delay()
self.assertEqual((yield sub.trg), 0)

yield dut.bus.addr.eq(addr_pending)
yield dut.bus.r_stb.eq(1)
yield
yield dut.bus.r_stb.eq(0)
yield
self.assertEqual((yield dut.bus.r_data), 0b1)
yield

yield dut.bus.addr.eq(addr_pending)
yield dut.bus.w_stb.eq(1)
yield dut.bus.w_data.eq(0b1)
yield
yield dut.bus.w_stb.eq(0)
yield

yield dut.bus.addr.eq(addr_pending)
yield dut.bus.r_stb.eq(1)
yield
yield dut.bus.r_stb.eq(0)
yield
self.assertEqual((yield dut.bus.r_data), 0b0)

simulation_test(dut, process)

0 comments on commit 517638c

Please sign in to comment.