diff --git a/nmigen_soc/csr/__init__.py b/nmigen_soc/csr/__init__.py index 3b2d416..35bfd29 100644 --- a/nmigen_soc/csr/__init__.py +++ b/nmigen_soc/csr/__init__.py @@ -1 +1,2 @@ from .bus import * +from .event import * diff --git a/nmigen_soc/csr/event.py b/nmigen_soc/csr/event.py new file mode 100644 index 0000000..4e76c6a --- /dev/null +++ b/nmigen_soc/csr/event.py @@ -0,0 +1,106 @@ +# nmigen: UnusedElaboratable=no + +from nmigen import * + +from . import Element, Multiplexer +from .. import event + + +__all__ = ["EventMonitor"] + + +class EventMonitor(Elaboratable): + """Event monitor. + + A monitor for subordinate event sources, with a CSR bus interface. + + CSR registers + ------------- + enable : ``self.src.event_map.size``, read/write + Enabled events. See :meth:`..event.EventMap.sources` for layout. + pending : ``self.src.event_map.size``, read/clear + Pending events. See :meth:`..event.EventMap.sources` for layout. + + Parameters + ---------- + data_width : int + CSR bus data width. See :class:`..csr.Interface`. + alignment : int + CSR address alignment. See :class:`..memory.MemoryMap`. + trigger : :class:`..event.Source.Trigger` + Trigger mode. See :class:`..event.Source`. + """ + def __init__(self, *, data_width, alignment=0, trigger="level"): + choices = ("level", "rise", "fall") + if not isinstance(trigger, event.Source.Trigger) and trigger not in choices: + raise ValueError("Invalid trigger mode {!r}; must be one of {}" + .format(trigger, ", ".join(choices))) + + self._trigger = event.Source.Trigger(trigger) + self._map = event.EventMap() + self._monitor = None + self._enable = None + self._pending = None + self._mux = Multiplexer(addr_width=1, data_width=data_width, alignment=alignment) + self._frozen = False + + def freeze(self): + """Freeze the event monitor. + + Once the event monitor is frozen, subordinate sources cannot be added anymore. + """ + if self._frozen: + return + self._monitor = event.Monitor(self._map, trigger=self._trigger) + self._enable = Element(self._map.size, "rw") + self._pending = Element(self._map.size, "rw") + self._mux.add(self._enable, extend=True) + self._mux.add(self._pending, extend=True) + self._frozen = True + + @property + def src(self): + """Event source. + + Return value + ------------ + An :class:`..event.Source`. Its input line is asserted by the monitor when a subordinate + event is enabled and pending. + """ + self.freeze() + return self._monitor.src + + @property + def bus(self): + """CSR bus interface. + + Return value + ------------ + A :class:`..csr.Interface` providing access to registers. + """ + self.freeze() + return self._mux.bus + + def add(self, src): + """Add a subordinate event source. + + See :meth:`..event.EventMap.add` for details. + """ + self._map.add(src) + + def elaborate(self, platform): + self.freeze() + + m = Module() + m.submodules.monitor = self._monitor + m.submodules.mux = self._mux + + with m.If(self._enable.w_stb): + m.d.sync += self._monitor.enable.eq(self._enable.w_data) + m.d.comb += self._enable.r_data.eq(self._monitor.enable) + + with m.If(self._pending.w_stb): + m.d.comb += self._monitor.clear.eq(self._pending.w_data) + m.d.comb += self._pending.r_data.eq(self._monitor.pending) + + return m diff --git a/nmigen_soc/test/test_csr_event.py b/nmigen_soc/test/test_csr_event.py new file mode 100644 index 0000000..631e58e --- /dev/null +++ b/nmigen_soc/test/test_csr_event.py @@ -0,0 +1,146 @@ +# nmigen: UnusedElaboratable=no + +import unittest +from nmigen import * +from nmigen.back.pysim import * + +from ..csr import * +from .. import event + + +def simulation_test(dut, process): + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(process) + sim.run() + + +class EventMonitorTestCase(unittest.TestCase): + def test_params(self): + monitor = EventMonitor(data_width=16, alignment=4, trigger="rise") + self.assertEqual(monitor.bus.data_width, 16) + self.assertEqual(monitor.bus.memory_map.alignment, 4) + self.assertEqual(monitor.src.trigger, event.Source.Trigger.RISE) + + def test_trigger_wrong(self): + with self.assertRaisesRegex(ValueError, + r"Invalid trigger mode 'foo'; must be one of level, rise, fall"): + EventMonitor(data_width=8, trigger="foo") + + def test_add(self): + monitor = EventMonitor(data_width=8) + sub = event.Source() + monitor.add(sub) + self.assertEqual(monitor.src.event_map.size, 1) + self.assertEqual(monitor.src.event_map.index(sub), 0) + + def test_freeze(self): + monitor = EventMonitor(data_width=8) + monitor.freeze() + sub = event.Source() + with self.assertRaisesRegex(ValueError, + r"Event map has been frozen. Cannot add source."): + monitor.add(sub) + + def test_src_freeze(self): + monitor = EventMonitor(data_width=8) + monitor.src + sub = event.Source() + with self.assertRaisesRegex(ValueError, + r"Event map has been frozen. Cannot add source."): + monitor.add(sub) + + def test_bus_freeze(self): + monitor = EventMonitor(data_width=8) + monitor.bus + sub = event.Source() + with self.assertRaisesRegex(ValueError, + r"Event map has been frozen. Cannot add source."): + monitor.add(sub) + + def test_csr_regs(self): + monitor = EventMonitor(data_width=8) + sub_0 = event.Source() + sub_1 = event.Source() + monitor.add(sub_0) + monitor.add(sub_1) + resources = list(monitor.bus.memory_map.resources()) + self.assertEqual(len(resources), 2) + enable, enable_range = resources[0] + pending, pending_range = resources[1] + self.assertEqual( + (enable.width, enable.access, enable_range), + (2, Element.Access.RW, (0, 1)) + ) + self.assertEqual( + (pending.width, pending.access, pending_range), + (2, Element.Access.RW, (1, 2)) + ) + + def test_freeze_idempotent(self): + monitor = EventMonitor(data_width=8) + src = monitor.src + bus = monitor.bus + monitor.freeze() + self.assertIs(src, monitor.src) + self.assertIs(bus, monitor.bus) + + +class EventMonitorSimulationTestCase(unittest.TestCase): + def test_simple(self): + dut = EventMonitor(data_width=8) + sub = event.Source() + dut.add(sub) + + addr_enable = 0x0 + addr_pending = 0x1 + + def process(): + yield sub.i.eq(1) + yield Delay() + self.assertEqual((yield sub.trg), 1) + self.assertEqual((yield dut.src.i), 0) + + yield dut.bus.addr.eq(addr_enable) + yield dut.bus.r_stb.eq(1) + yield + yield dut.bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.bus.r_data), 0b0) + yield + + yield dut.bus.addr.eq(addr_enable) + yield dut.bus.w_stb.eq(1) + yield dut.bus.w_data.eq(0b1) + yield + yield dut.bus.w_stb.eq(0) + yield; yield Delay() + + self.assertEqual((yield dut.src.i), 1) + yield sub.i.eq(0) + yield Delay() + self.assertEqual((yield sub.trg), 0) + + yield dut.bus.addr.eq(addr_pending) + yield dut.bus.r_stb.eq(1) + yield + yield dut.bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.bus.r_data), 0b1) + yield + + yield dut.bus.addr.eq(addr_pending) + yield dut.bus.w_stb.eq(1) + yield dut.bus.w_data.eq(0b1) + yield + yield dut.bus.w_stb.eq(0) + yield + + yield dut.bus.addr.eq(addr_pending) + yield dut.bus.r_stb.eq(1) + yield + yield dut.bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.bus.r_data), 0b0) + + simulation_test(dut, process)