diff --git a/nmigen_soc/test/test_wishbone_bus.py b/nmigen_soc/test/test_wishbone_bus.py index 266580f..6cfa84d 100644 --- a/nmigen_soc/test/test_wishbone_bus.py +++ b/nmigen_soc/test/test_wishbone_bus.py @@ -123,6 +123,261 @@ def test_set_map_wrong_addr_width(self): iface.memory_map = MemoryMap(addr_width=30, data_width=8) +class ConnectorTestCase(unittest.TestCase): + def test_wrong_intr(self): + sub_bus = Interface(addr_width=10, data_width=8) + with self.assertRaisesRegexp(TypeError, + r"Initiator bus must be an instance of wishbone.Interface, not 'foo'"): + Connector(intr_bus="foo", sub_bus=sub_bus) + + def test_wrong_sub(self): + intr_bus = Interface(addr_width=10, data_width=8) + with self.assertRaisesRegexp(TypeError, + r"Subordinate bus must be an instance of wishbone.Interface, not 'foo'"): + Connector(intr_bus=intr_bus, sub_bus="foo") + + def test_wrong_bitsize(self): + intr_bus = Interface(addr_width=10, data_width=32) + sub_bus = Interface(addr_width=10, data_width=8) + with self.assertRaisesRegexp(ValueError, + r"Total bit size of initiator and subordinate bus have to be the same"): + Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def test_wrong_granularity(self): + intr_bus = Interface(addr_width=12, data_width=8) + sub_bus = Interface(addr_width=10, data_width=32) + with self.assertRaisesRegexp(ValueError, + r"Granularity of subordinate bus has to be smaller or equal to " + r"granulariy of initiator bus"): + Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def test_lock_mismatch(self): + intr_bus = Interface(addr_width=10, data_width=8, features={"lock"}) + sub_bus = Interface(addr_width=10, data_width=8) + with self.assertRaisesRegexp(ValueError, + r"Initiator bus has optional output 'lock', but the subordinate bus " + r"does not have a corresponding input"): + Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def test_err_mismatch(self): + intr_bus = Interface(addr_width=10, data_width=8) + sub_bus = Interface(addr_width=10, data_width=8, features={"err"}) + with self.assertRaisesRegexp(ValueError, + r"Subordinate bus has optional output 'err', but the initiator bus " + r"does not have a corresponding input"): + Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def test_rty_mismatch(self): + intr_bus = Interface(addr_width=10, data_width=8) + sub_bus = Interface(addr_width=10, data_width=8, features={"rty"}) + with self.assertRaisesRegexp(ValueError, + r"Subordinate bus has optional output 'rty', but the initiator bus " + r"does not have a corresponding input"): + Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def test_not_implemented_multicycle(self): + intr_bus = Interface(addr_width=10, data_width=32) + sub_bus = Interface(addr_width=12, data_width=8) + with self.assertRaisesRegexp(NotImplementedError, + r"Support for multi-cycle bus operation when initiator data_width is" + r"bigger than the subordinate one is not implemented."): + Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + +class ConnectorSimulationTestCase(unittest.TestCase): + def test_same(self): + intr_bus = Interface(addr_width=10, data_width=32, granularity=8) + sub_bus = Interface(addr_width=10, data_width=32, granularity=8) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.adr.eq(1) + yield intr_bus.we.eq(0) + yield intr_bus.cyc.eq(1) + yield intr_bus.stb.eq(1) + yield intr_bus.sel.eq(5) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.adr), 1) + self.assertEqual((yield sub_bus.we), 0) + self.assertEqual((yield sub_bus.cyc), 1) + self.assertEqual((yield sub_bus.stb), 1) + self.assertEqual((yield sub_bus.sel), 5) + yield sub_bus.ack.eq(1) + yield Delay(1e-6) + self.assertEqual((yield intr_bus.ack), 1) + yield intr_bus.adr.eq(127) + yield intr_bus.we.eq(1) + yield intr_bus.cyc.eq(1) + yield intr_bus.stb.eq(0) + yield intr_bus.sel.eq(10) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.adr), 127) + self.assertEqual((yield sub_bus.we), 1) + self.assertEqual((yield sub_bus.cyc), 1) + self.assertEqual((yield sub_bus.stb), 0) + self.assertEqual((yield sub_bus.sel), 10) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_same_pipelined(self): + intr_bus = Interface(addr_width=10, data_width=8, features={"stall"}) + sub_bus = Interface(addr_width=10, data_width=8, features={"stall"}) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.adr.eq(1) + yield intr_bus.we.eq(0) + yield intr_bus.cyc.eq(1) + yield intr_bus.stb.eq(1) + yield intr_bus.sel.eq(1) + yield sub_bus.stall.eq(1) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.adr), 1) + self.assertEqual((yield sub_bus.we), 0) + self.assertEqual((yield sub_bus.cyc), 1) + self.assertEqual((yield sub_bus.stb), 1) + self.assertEqual((yield sub_bus.sel), 1) + self.assertEqual((yield intr_bus.stall), 1) + yield sub_bus.stall.eq(0) + yield Delay(1e-6) + self.assertEqual((yield intr_bus.stall), 0) + yield sub_bus.ack.eq(1) + yield Delay(1e-6) + self.assertEqual((yield intr_bus.ack), 1) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_default(self): + intr_bus = Interface(addr_width=10, data_width=8, features={"err", "rty"}) + sub_bus = Interface(addr_width=10, data_width=8, features={"lock", "cti", "bte"}) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield Delay(1e-6) + self.assertEqual((yield intr_bus.err), 0) + self.assertEqual((yield intr_bus.rty), 0) + self.assertEqual((yield sub_bus.lock), 0) + self.assertEqual((yield sub_bus.cti), CycleType.CLASSIC.value) + self.assertEqual((yield sub_bus.bte), BurstTypeExt.LINEAR.value) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_conv_granularity(self): + intr_bus = Interface(addr_width=10, data_width=32) + sub_bus = Interface(addr_width=10, data_width=32, granularity=8) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.sel.eq(1) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.sel), 0b1111) + yield intr_bus.sel.eq(0) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.sel), 0b0000) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_conv_addr_width(self): + intr_bus = Interface(addr_width=12, data_width=8) + sub_bus = Interface(addr_width=10, data_width=32, granularity=8) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.adr.eq(1) + yield intr_bus.sel.eq(1) + yield intr_bus.dat_w.eq(0xA5) + yield sub_bus.dat_r.eq(0x03020100) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.sel), 0b0010) + self.assertEqual((yield sub_bus.dat_w), 0xA5A5A5A5) + self.assertEqual((yield intr_bus.dat_r), 0x01) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_conv_granularity_addr_width(self): + intr_bus = Interface(addr_width=11, data_width=16) + sub_bus = Interface(addr_width=10, data_width=32, granularity=8) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.adr.eq(3) + yield intr_bus.sel.eq(1) + yield intr_bus.dat_w.eq(0xA55A) + yield sub_bus.dat_r.eq(0x03020100) + yield Delay(1e-6) + self.assertEqual((yield sub_bus.sel), 0b1100) + self.assertEqual((yield sub_bus.dat_w), 0xA55AA55A) + self.assertEqual((yield intr_bus.dat_r), 0x0302) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_pipelined_initiator(self): + intr_bus = Interface(addr_width=10, data_width=8, features={"stall"}) + sub_bus = Interface(addr_width=10, data_width=8) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.adr.eq(1) + yield intr_bus.sel.eq(1) + yield intr_bus.cyc.eq(1) + yield intr_bus.stb.eq(1) + yield Delay(1e-7) + self.assertEqual((yield sub_bus.cyc), 1) + self.assertEqual((yield sub_bus.stb), 1) + self.assertEqual((yield intr_bus.ack), 0) + self.assertEqual((yield intr_bus.stall), 1) + yield Delay(1e-7) + yield sub_bus.ack.eq(1) + yield Delay(1e-7) + self.assertEqual((yield intr_bus.stall), 0) + + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_process(sim_test()) + sim.run() + + def test_pipelined_subordinate(self): + intr_bus = Interface(addr_width=10, data_width=8) + sub_bus = Interface(addr_width=10, data_width=8, features={"stall"}) + dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus) + + def sim_test(): + yield intr_bus.adr.eq(1) + yield intr_bus.sel.eq(1) + yield intr_bus.cyc.eq(1) + yield intr_bus.stb.eq(1) + yield Delay(1e-8) + self.assertEqual((yield sub_bus.cyc), 1) + self.assertEqual((yield sub_bus.stb), 1) + self.assertEqual((yield intr_bus.ack), 0) + yield + yield sub_bus.ack.eq(1) + yield Delay(1e-8) + self.assertEqual((yield intr_bus.ack), 1) + yield intr_bus.stb.eq(0) + yield + self.assertEqual((yield intr_bus.ack), 1) + yield sub_bus.ack.eq(0) + yield + self.assertEqual((yield intr_bus.ack), 0) + + with Simulator(dut, vcd_file=open("test_debug.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(sim_test()) + sim.run() + + class DecoderTestCase(unittest.TestCase): def setUp(self): self.dut = Decoder(addr_width=31, data_width=32, granularity=16) diff --git a/nmigen_soc/wishbone/bus.py b/nmigen_soc/wishbone/bus.py index a62949b..ea78a43 100644 --- a/nmigen_soc/wishbone/bus.py +++ b/nmigen_soc/wishbone/bus.py @@ -1,12 +1,13 @@ from enum import Enum from nmigen import * +from nmigen.lib import coding from nmigen.hdl.rec import Direction from nmigen.utils import log2_int from ..memory import MemoryMap -__all__ = ["CycleType", "BurstTypeExt", "Interface", "Decoder", "Arbiter"] +__all__ = ["CycleType", "BurstTypeExt", "Interface", "Connector", "Decoder", "Arbiter"] class CycleType(Enum): @@ -169,6 +170,122 @@ def memory_map(self, memory_map): self._map = memory_map +class Connector(Elaboratable): + """Module to connect one Wishbone initiator bus to one Wishbone subordinate bus + + Ths class also handles data_width conversion if the two buses have compatible + granularity. This means that granularity of subordinate bus has to be smaller or + equal to the granularity of the initiator bus. + Currently initiating multiple subordinate bus cycles for initiator bus cycle is + not implement. As a consequence an initiator bus data width bigger than the + subordinate bus data width is not supported. + + Parameters: + ----------- + intr_bus : :class:`Interface` + The initiator bus + sub_bus : :class:`Interface` + The subordinate bus + """ + def __init__(self, *, intr_bus, sub_bus): + if not isinstance(intr_bus, Interface): + raise TypeError("Initiator bus must be an instance of wishbone.Interface, not {!r}" + .format(intr_bus)) + if not isinstance(sub_bus, Interface): + raise TypeError("Subordinate bus must be an instance of wishbone.Interface, not {!r}" + .format(sub_bus)) + intr_size = (2**intr_bus.addr_width)*intr_bus.data_width + sub_size = (2**sub_bus.addr_width)*sub_bus.data_width + if intr_size != sub_size: + raise ValueError("Total bit size of initiator and subordinate bus have to be the same") + if sub_bus.granularity > intr_bus.granularity: + raise ValueError( + "Granularity of subordinate bus has to be smaller or equal to " + "granulariy of initiator bus" + ) + for opt_output in {"lock"}: + if hasattr(intr_bus, opt_output) and not hasattr(sub_bus, opt_output): + raise ValueError("Initiator bus has optional output {!r}, but the subordinate bus " + "does not have a corresponding input" + .format(opt_output)) + for opt_output in {"err", "rty"}: + if hasattr(sub_bus, opt_output) and not hasattr(intr_bus, opt_output): + raise ValueError("Subordinate bus has optional output {!r}, but the initiator bus " + "does not have a corresponding input" + .format(opt_output)) + if intr_bus.data_width > sub_bus.data_width: + raise NotImplementedError( + "Support for multi-cycle bus operation when initiator data_width is" + "bigger than the subordinate one is not implemented." + ) + + self.intr_bus = intr_bus + self.sub_bus = sub_bus + + def elaborate(self, platform): + m = Module() + + common_addr_width = min(self.intr_bus.addr_width, self.sub_bus.addr_width) + m.d.comb += [ + self.sub_bus.cyc.eq(self.intr_bus.cyc), + self.sub_bus.we.eq(self.intr_bus.we), + self.sub_bus.adr[(self.sub_bus.addr_width-common_addr_width):self.sub_bus.addr_width].eq( + self.intr_bus.adr[(self.intr_bus.addr_width-common_addr_width):self.intr_bus.addr_width] + ), + self.intr_bus.ack.eq(self.sub_bus.ack), + ] + if hasattr(self.intr_bus, "err"): + m.d.comb += self.intr_bus.err.eq(getattr(self.sub_bus, "err", 0)) + if hasattr(self.intr_bus, "rty"): + m.d.comb += self.intr_bus.rty.eq(getattr(self.sub_bus, "rty", 0)) + if hasattr(self.sub_bus, "lock"): + m.d.comb += self.sub_bus.lock.eq(getattr(self.intr_bus, "lock", 0)) + if hasattr(self.sub_bus, "cti"): + m.d.comb += self.sub_bus.cti.eq(getattr(self.intr_bus, "cti", CycleType.CLASSIC)) + if hasattr(self.sub_bus, "bte"): + m.d.comb += self.sub_bus.bte.eq(getattr(self.intr_bus, "bte", BurstTypeExt.LINEAR)) + + # stb and stall for different pipeline combinations of initiator and subordinate bus + if hasattr(self.intr_bus, "stall") == hasattr(self.sub_bus, "stall"): + m.d.comb += self.sub_bus.stb.eq(self.intr_bus.stb) + if hasattr(self.intr_bus, "stall"): + m.d.comb += self.intr_bus.stall.eq(self.sub_bus.stall) + elif hasattr(self.intr_bus, "stall"): + # See Wishbone B4 spec: 5.2 Pipelined master connected to standard slave + m.d.comb += [ + self.sub_bus.stb.eq(self.intr_bus.stb), + self.intr_bus.stall.eq(self.intr_bus.cyc & ~self.sub_bus.ack), + ] + else: + # See Wishbone B4 spec: 5.1 Standard master connected to pipelined slave + wait4ack = Signal() + m.d.sync += wait4ack.eq(self.intr_bus.stb & ~(wait4ack & self.sub_bus.ack)) + m.d.comb += self.sub_bus.stb.eq(self.intr_bus.stb & ~wait4ack) + + # Data and sel width conversion + m.submodules.sel_decoder = sel_decoder = coding.Decoder(width=1<<(self.intr_bus.addr_width-self.sub_bus.addr_width)) + sel_from_granularity = Const(-1, self.intr_bus.granularity//self.sub_bus.granularity) + m.d.comb += [ + sel_decoder.i.eq(self.intr_bus.adr[:self.intr_bus.addr_width-self.sub_bus.addr_width]), + self.sub_bus.dat_w.eq(Repl(self.intr_bus.dat_w, self.sub_bus.data_width//self.intr_bus.data_width)), + self.intr_bus.dat_r.eq(self.sub_bus.dat_r.word_select(sel_decoder.i, self.intr_bus.data_width)), + self.sub_bus.sel.eq( + Cat( + Cat( + Cat( + sel_a & sel_i & sel_g + for sel_g in sel_from_granularity + ) + for sel_i in self.intr_bus.sel + ) + for sel_a in sel_decoder.o + ) + ), + ] + + return m + + class Decoder(Elaboratable): """Wishbone bus decoder.