diff --git a/src/middlewared/debian/control b/src/middlewared/debian/control index 48e41da524629..249a1c02832de 100644 --- a/src/middlewared/debian/control +++ b/src/middlewared/debian/control @@ -63,6 +63,7 @@ Build-Depends: alembic, python3-truenas-api-client, python3-truenas-connect-utils, python3-truenas-crypto-utils, + python3-truenas-spdk, python3-truenas-verify, python3-websocket, python3-zeroconf, @@ -156,6 +157,7 @@ Depends: alembic, python3-truenas-api-client, python3-truenas-connect-utils, python3-truenas-crypto-utils, + python3-truenas-spdk, python3-truenas-verify, python3-websocket, python3-zeroconf, diff --git a/src/middlewared/middlewared/alembic/versions/25.10/2025-07-16_16-23_nvmet_dedicated_nic.py b/src/middlewared/middlewared/alembic/versions/25.10/2025-07-16_16-23_nvmet_dedicated_nic.py new file mode 100644 index 0000000000000..09336582aff23 --- /dev/null +++ b/src/middlewared/middlewared/alembic/versions/25.10/2025-07-16_16-23_nvmet_dedicated_nic.py @@ -0,0 +1,28 @@ +"""Add dedicated_nic to nvmet port config + +This will be used with SPDK. + +Revision ID: 72b63cd393d3 +Revises: 3d738dbd75ef +Create Date: 2025-07-16 16:23:12.160536+00:00 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '72b63cd393d3' +down_revision = '3d738dbd75ef' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('services_nvmet_port', schema=None) as batch_op: + batch_op.add_column(sa.Column('nvmet_port_dedicated_nic', sa.Boolean(), server_default='0', nullable=False)) + + +def downgrade(): + with op.batch_alter_table('services_nvmet_port', schema=None) as batch_op: + batch_op.drop_column('nvmet_port_dedicated_nic') diff --git a/src/middlewared/middlewared/api/v25_10_0/nvmet_port.py b/src/middlewared/middlewared/api/v25_10_0/nvmet_port.py index 4fc29c0f513de..54339325244a3 100644 --- a/src/middlewared/middlewared/api/v25_10_0/nvmet_port.py +++ b/src/middlewared/middlewared/api/v25_10_0/nvmet_port.py @@ -48,6 +48,12 @@ class NVMetPortEntry(BaseModel): # """ Transport Requirements codes for Discovery Log Page entry TREQ field. """ enabled: bool = True """ Port enabled. When NVMe target is running, cannot make changes to an enabled port. """ + dedicated_nic: bool = False + """ + NIC dedicated. If supported by the underlying NVMe-oF implementation, then + the NIC will be dedicated to NVMe-oF, and not available for use by other + protocols. + """ class NVMetPortCreateTemplate(NVMetPortEntry, ABC): diff --git a/src/middlewared/middlewared/etc_files/nvmet_kernel.py b/src/middlewared/middlewared/etc_files/nvmet_kernel.py index 0953db4534220..c04db73018cac 100644 --- a/src/middlewared/middlewared/etc_files/nvmet_kernel.py +++ b/src/middlewared/middlewared/etc_files/nvmet_kernel.py @@ -1,4 +1,4 @@ -from middlewared.plugins.nvmet.kernel import write_config +from middlewared.utils.nvmet.kernel import write_config def render(service, middleware, render_ctx): diff --git a/src/middlewared/middlewared/etc_files/nvmet_spdk.py b/src/middlewared/middlewared/etc_files/nvmet_spdk.py new file mode 100644 index 0000000000000..016e45b8556f7 --- /dev/null +++ b/src/middlewared/middlewared/etc_files/nvmet_spdk.py @@ -0,0 +1,35 @@ +from middlewared.utils.nvmet.spdk import write_config +from middlewared.plugins.nvmet.constants import NAMESPACE_DEVICE_TYPE + + +def render(service, middleware, render_ctx): + if middleware.call_sync('nvmet.spdk.nvmf_ready', True): + + # If we have any namespaces that are configured which are FILE + # type, then we need to work out the blocksize for each one. + # This will be the recordsize of the underlying dataset. + fns = {ns['device_path'] for ns in filter(lambda ns: ns.get('device_type') == NAMESPACE_DEVICE_TYPE.FILE.api, + render_ctx['nvmet.namespace.query'])} + if fns: + record_sizes = {f'{item["mountpoint"]}/': int(item['recordsize']['rawvalue']) for item in + middleware.call_sync('pool.dataset.query', + [["mountpoint", "!=", None]], + {"select": ["name", + "children", + "mountpoint", + "recordsize.rawvalue"]})} + path_to_recordsize = {} + for path in fns: + longest_match = 0 + matched_value = None + for key, value in record_sizes.items(): + if path.startswith(key): + if (length := len(key)) > longest_match: + longest_match = length + matched_value = value + if matched_value: + path_to_recordsize[path] = matched_value + # Inject into context + render_ctx['path_to_recordsize'] = path_to_recordsize + + write_config(render_ctx) diff --git a/src/middlewared/middlewared/plugins/etc.py b/src/middlewared/middlewared/plugins/etc.py index 862995e6fed14..8b13216990ebb 100644 --- a/src/middlewared/middlewared/plugins/etc.py +++ b/src/middlewared/middlewared/plugins/etc.py @@ -201,6 +201,7 @@ class EtcService(Service): ], 'entries': [ {'type': 'py', 'path': 'nvmet_kernel'}, + {'type': 'py', 'path': 'nvmet_spdk'}, ] }, 'pam': { diff --git a/src/middlewared/middlewared/plugins/nvmet/constants.py b/src/middlewared/middlewared/plugins/nvmet/constants.py index abcfc96d2fec6..a1d417a0f0186 100644 --- a/src/middlewared/middlewared/plugins/nvmet/constants.py +++ b/src/middlewared/middlewared/plugins/nvmet/constants.py @@ -1,9 +1,7 @@ import enum NVMET_KERNEL_CONFIG_DIR = '/sys/kernel/config/nvmet' -NVMET_NODE_A_ANA_GRPID = 2 -NVMET_NODE_B_ANA_GRPID = 3 - +NVMET_DISCOVERY_NQN = 'nqn.2014-08.org.nvmexpress.discovery' NVMET_NQN_UUID = 'nqn.2011-06.com.truenas:uuid' NVMET_SERVICE_NAME = 'nvmet' @@ -24,6 +22,10 @@ def api(self): def sysfs(self): return self.value[2] + @property + def spdk(self): + return self.value[3] + @classmethod def by_db(cls, needle, raise_exception=True): for x in cls.__members__.values(): @@ -68,10 +70,10 @@ class PORT_TRTYPE(ApiMapper): class PORT_ADDR_FAMILY(ApiMapper): - IPV4 = (1, 'IPV4', 'ipv4') - IPV6 = (2, 'IPV6', 'ipv6') - IB = (3, 'IB', 'ib') - FC = (4, 'FC', 'fc') + IPV4 = (1, 'IPV4', 'ipv4', 'IPv4') + IPV6 = (2, 'IPV6', 'ipv6', 'IPv6') + IB = (3, 'IB', 'ib', 'IB') + FC = (4, 'FC', 'fc', 'FC') def port_transport_family_generator(): diff --git a/src/middlewared/middlewared/plugins/nvmet/global.py b/src/middlewared/middlewared/plugins/nvmet/global.py index 28d11019e0647..9755df0b3b0d1 100644 --- a/src/middlewared/middlewared/plugins/nvmet/global.py +++ b/src/middlewared/middlewared/plugins/nvmet/global.py @@ -14,11 +14,14 @@ from middlewared.service import SystemServiceService, ValidationErrors, filterable_api_method, private from middlewared.utils import filter_list from .constants import NVMET_SERVICE_NAME -from .kernel import clear_config, load_modules, nvmet_kernel_module_loaded, unload_module +from middlewared.utils.nvmet.kernel import clear_config, load_modules, nvmet_kernel_module_loaded, unload_module +from middlewared.utils.nvmet.spdk import make_client, nvmf_subsystem_get_qpairs from .mixin import NVMetStandbyMixin from .utils import uuid_nqn NVMET_DEBUG_DIR = '/sys/kernel/debug/nvmet' +NVMF_SERVICE = 'nvmf' +AVX2_FLAG = 'avx2' class NVMetGlobalModel(sa.Model): @@ -69,9 +72,11 @@ async def do_update(self, data): async def __ana_forbidden(self): return not await self.middleware.call('failover.licensed') + async def __spdk_forbidden(self): + # For now we'll disallow SPDK if any CPU is missing avx2 + return any(AVX2_FLAG not in flags for flags in (await self.middleware.call('system.cpu_flags')).values()) + async def __validate(self, verrors, data, schema_name, old=None): - if not data.get('kernel', False): - verrors.add(f'{schema_name}.kernel', 'Cannot disable kernel mode.') if data['rdma'] and old['rdma'] != data['rdma']: available_rdma_protocols = await self.middleware.call('rdma.capable_protocols') if RDMAprotocols.NVMET.value not in available_rdma_protocols: @@ -85,6 +90,17 @@ async def __validate(self, verrors, data, schema_name, old=None): f'{schema_name}.ana', 'This platform does not support Asymmetric Namespace Access(ANA).' ) + if old['kernel'] != data['kernel']: + if not data['kernel'] and await self.__spdk_forbidden(): + verrors.add( + f'{schema_name}.kernel', + 'Cannot switch nvmet backend because CPU lacks required capabilities.' + ) + elif await self.running(): + verrors.add( + f'{schema_name}.kernel', + 'Cannot switch nvmet backend while the service is running.' + ) @api_method( NVMetGlobalAnaEnabledArgs, @@ -133,9 +149,9 @@ async def rdma_enabled(self): async def sessions(self, filters, options): sessions = [] subsys_id = None - for filter in filters: - if len(filter) == 3 and filter[0] == 'subsys_id' and filter[1] == '=': - subsys_id = filter[2] + for _filter in filters: + if len(_filter) == 3 and _filter[0] == 'subsys_id' and _filter[1] == '=': + subsys_id = _filter[2] break sessions = await self.middleware.call('nvmet.global.local_sessions', subsys_id) if await self.ana_enabled(): @@ -171,33 +187,74 @@ def local_sessions(self, subsys_id=None): sessions = [] global_info = self.middleware.call_sync('nvmet.global.config') subsystems = self.middleware.call_sync('nvmet.subsys.query') + ports = self.middleware.call_sync('nvmet.port.query') + ha = self.middleware.call_sync('failover.licensed') if global_info['kernel']: nvmet_debug_path = pathlib.Path(NVMET_DEBUG_DIR) if not nvmet_debug_path.exists(): return sessions - port_index_to_id = {port['index']: port['id'] for port in self.middleware.call_sync('nvmet.port.query')} + port_index_to_id = {port['index']: port['id'] for port in ports} - if subsys_id is None: - basenqn = global_info['basenqn'] - subsys_name_to_subsys_id = {f'{basenqn}:{subsys["name"]}': subsys['id'] for subsys in subsystems} - for subsys in nvmet_debug_path.iterdir(): - if subsys_id := subsys_name_to_subsys_id.get(subsys.name): - for ctrl in subsys.iterdir(): - if session := self.__parse_session_dir(ctrl, port_index_to_id): - session['subsys_id'] = subsys_id - sessions.append(session) - else: - for subsys in subsystems: - if subsys['id'] == subsys_id: - subnqn = f'{global_info["basenqn"]}:{subsys["name"]}' - path = nvmet_debug_path / subnqn - if path.is_dir(): - for ctrl in path.iterdir(): + if subsys_id is None: + subsys_name_to_subsys_id = {subsys['subnqn']: subsys['id'] for subsys in subsystems} + for subsys in nvmet_debug_path.iterdir(): + if subsys_id := subsys_name_to_subsys_id.get(subsys.name): + for ctrl in subsys.iterdir(): if session := self.__parse_session_dir(ctrl, port_index_to_id): session['subsys_id'] = subsys_id sessions.append(session) + else: + for subsys in subsystems: + if subsys['id'] == subsys_id: + subnqn = subsys['subnqn'] + path = nvmet_debug_path / subnqn + if path.is_dir(): + for ctrl in path.iterdir(): + if session := self.__parse_session_dir(ctrl, port_index_to_id): + session['subsys_id'] = subsys_id + sessions.append(session) + else: + if not self.middleware.call_sync('nvmet.spdk.nvmf_ready'): + return sessions + client = make_client() + choices = {} + port_to_portid = {} + for port in ports: + addr = port['addr_traddr'] + trtype = port['addr_trtype'] + addrs = [addr] + if ha: + try: + mychoices = choices[trtype] + except KeyError: + mychoices = self.middleware.call_sync('nvmet.port.transport_address_choices', trtype, True) + mychoices[trtype] = mychoices + if pair := mychoices.get(addr, '').split('/'): + addrs.extend(pair) + + for addr in addrs: + port_to_portid[f"{trtype}:{addr}:{port['addr_trsvcid']}"] = port['id'] + for subsys in subsystems: + if subsys_id is not None and subsys_id != subsys['id']: + continue + for entry in nvmf_subsystem_get_qpairs(client, subsys['subnqn']): + laddr = entry['listen_address'] + key = f"{laddr['trtype']}:{laddr['traddr']}:{laddr['trsvcid']}" + if port_id := port_to_portid.get(key): + session = { + 'host_traddr': entry.get('peer_address', {}).get('traddr'), + 'hostnqn': entry['hostnqn'], + 'subsys_id': subsys['id'], + 'port_id': port_id, + 'ctrl': entry['cntlid'] + } + # Do we really care that we might have multiple duplicate connections + # (only visible delta being a different qid and peer_address.trsvcid) + # If so, replace this with a simple append. + if session not in sessions: + sessions.append(session) return sessions @@ -228,7 +285,7 @@ async def running(self): if (await self.config())['kernel']: return await self.middleware.run_in_thread(nvmet_kernel_module_loaded) else: - return False + return await self.middleware.call('service.started', NVMF_SERVICE) @private async def reload(self): @@ -237,14 +294,22 @@ async def reload(self): @private async def start(self): - await self.middleware.call('nvmet.global.load_kernel_modules') + if (await self.config())['kernel']: + await self.middleware.call('nvmet.global.load_kernel_modules') + else: + await self.middleware.call('nvmet.spdk.slots') + if await self.middleware.call('service.start', NVMF_SERVICE): + await self.middleware.call('nvmet.spdk.wait_nvmf_ready') await self.middleware.call('etc.generate', 'nvmet') @private async def stop(self): if await self.running(): - await self.middleware.run_in_thread(clear_config) - await self.middleware.call('nvmet.global.unload_kernel_modules') + if (await self.config())['kernel']: + await self.middleware.run_in_thread(clear_config) + await self.middleware.call('nvmet.global.unload_kernel_modules') + else: + return await self.middleware.call('service.stop', NVMF_SERVICE) @private async def system_ready(self): diff --git a/src/middlewared/middlewared/plugins/nvmet/namespace.py b/src/middlewared/middlewared/plugins/nvmet/namespace.py index 1240b6b3ad767..a09449976e088 100644 --- a/src/middlewared/middlewared/plugins/nvmet/namespace.py +++ b/src/middlewared/middlewared/plugins/nvmet/namespace.py @@ -17,9 +17,9 @@ from middlewared.service import SharingService, ValidationErrors, private from middlewared.service_exception import CallError, MatchNotFound from .constants import NAMESPACE_DEVICE_TYPE -from .kernel import lock_namespace as kernel_lock_namespace -from .kernel import unlock_namespace as kernel_unlock_namespace -from .kernel import resize_namespace as kernel_resize_namespace +from middlewared.utils.nvmet.kernel import lock_namespace as kernel_lock_namespace +from middlewared.utils.nvmet.kernel import unlock_namespace as kernel_unlock_namespace +from middlewared.utils.nvmet.kernel import resize_namespace as kernel_resize_namespace UUID_GENERATE_RETRIES = 10 NSID_SEARCH_RANGE = 0xFFFF # This is much less than NSID, but good enough for practical purposes. diff --git a/src/middlewared/middlewared/plugins/nvmet/port.py b/src/middlewared/middlewared/plugins/nvmet/port.py index 282ef052b2adf..69d68644a1782 100644 --- a/src/middlewared/middlewared/plugins/nvmet/port.py +++ b/src/middlewared/middlewared/plugins/nvmet/port.py @@ -36,6 +36,7 @@ class NVMetPortModel(sa.Model): nvmet_port_max_queue_size = sa.Column(sa.Integer(), nullable=True, default=None) nvmet_port_pi_enable = sa.Column(sa.Boolean(), nullable=True, default=None) nvmet_port_enabled = sa.Column(sa.Boolean()) + nvmet_port_dedicated_nic = sa.Column(sa.Boolean(), default=False) class NVMetPortService(CRUDService): @@ -276,6 +277,25 @@ async def __validate(self, verrors, data, schema_name, old=None): except MatchNotFound: existing = None + # Dedicate NIC checks only apply if SPDK is running: + if running := await self.middleware.call('nvmet.global.running'): + if not (await self.middleware.call('nvmet.global.config'))['kernel']: + # SPDK is running + if data['dedicated_nic'] and not old: + # Cannot add a *new* dedicated NIC + # Check to see if any ports already have this NIC as dedicated + filters = [ + ['addr_trtype', '=', data['addr_trtype']], + ['addr_traddr', '=', data['addr_traddr']], + ['dedicated_nic', '=', True], + ] + if not await self.middleware.call('nvmet.port.query', filters): + verrors.add(f'{schema_name}.dedicated_nic', + 'Cannot add dedicated_nic when service is running.') + elif old and old['dedicated_nic'] != data['dedicated_nic']: + verrors.add(f'{schema_name}.dedicated_nic', + 'Cannot modify dedicated_nic when service is running') + if old is None: # Create # Ensure that we're not duplicating an existing entry @@ -295,11 +315,11 @@ async def __validate(self, verrors, data, schema_name, old=None): [['port.id', '=', old['id']]], {'count': True}): # Have some subsystems attached to the port - if old['enabled'] and await self.middleware.call('nvmet.global.running'): - # port is enabled and running - # Ensure we're only changing enabled + if old['enabled'] and running: + # port is enabled and running. Ensure we're only changing enabled + # (or dedicated_nic which is checked separately above) for key, oldvalue in old.items(): - if key == 'enabled': + if key in ['enabled', 'dedicated_nic']: continue if data[key] == oldvalue: continue diff --git a/src/middlewared/middlewared/plugins/nvmet/spdk.py b/src/middlewared/middlewared/plugins/nvmet/spdk.py new file mode 100644 index 0000000000000..7450bb6ca92e1 --- /dev/null +++ b/src/middlewared/middlewared/plugins/nvmet/spdk.py @@ -0,0 +1,151 @@ +import asyncio +import os + +from middlewared.service import CallError, Service +from middlewared.utils import run +from middlewared.utils.nvmet.spdk import nvmf_ready + +from .constants import PORT_ADDR_FAMILY, PORT_TRTYPE + +SETUP_SH = '/opt/spdk/scripts/setup.sh' + + +class NVMetSPDKService(Service): + + class Config: + private = True + namespace = 'nvmet.spdk' + + async def _run_setup(self, *args, **kwargs): + command = [SETUP_SH, *args] + cp = await run(command, **kwargs) + if cp.returncode: + return False + return True + + async def setup(self): + """ + Perform necessary setup for SPDK. + + Allocate hugepages and bind PCI devices. + """ + _slots = await self.slots() + my_env = os.environ.copy() + if _slots: + my_env['PCI_ALLOWED'] = " ".join(_slots) + else: + my_env['PCI_ALLOWED'] = "none" + return await self._run_setup('config', env=my_env) + + async def reset(self): + """ + Rebind PCI devices back to their original drivers. + + Also cleanup any leftover spdk files/resources. + Hugepage memory size will remain unchanged. + """ + return await self._run_setup('reset') + + async def cleanup(self): + """ + Remove any orphaned files that can be left in the system after SPDK application exit + """ + return await self._run_setup('cleanup') + + async def slots(self): + _nics = await self.dedicated_nics() + return await self.middleware.call('nvmet.spdk.pci_slots', _nics) + + def pci_slots(self, nics): + pci_slots = [] + for nic in nics: + with open(f'/sys/class/net/{nic}/device/uevent', 'r') as f: + for line in f: + if line.startswith('PCI_SLOT_NAME='): + if slot := line.strip().split('=', 1)[1]: + pci_slots.append(slot) + break + if len(nics) != len(pci_slots): + raise CallError("Could not find PCI slot for every NIC") + return pci_slots + + async def dedicated_nics(self): + """ + Return a list of NIC names corresponding to all configure dedicated NVMe-oF ports. + """ + # Check that kernel nvmet is not enabled + if (await self.middleware.call('nvmet.global.config'))['kernel']: + raise CallError("NVMe-oF configured for kernel target") + + # Need to obtain the PCI devices associated with configured ports. + ports = await self.middleware.call('nvmet.port.query', [['dedicated_nic', '=', True]]) + if not ports: + # It's OK if there are no dedicated ports configured. That'll mean that SPDK falls + # back to using the regular kernel drivers for NICs. + return [] + + # For the time being we only support TCP/RDMA with IPv6/IPv6 + if do_failover := await self.middleware.call('failover.licensed'): + node = await self.middleware.call('failover.node') + choices = {} + addresses = set() + for port in ports: + if port['addr_trtype'] not in [PORT_TRTYPE.TCP.api, PORT_TRTYPE.RDMA.api]: + raise CallError(f"Unsupported addr_trtype: {port['addr_trtype']!r}") + if port['addr_adrfam'] not in [PORT_ADDR_FAMILY.IPV4.api, PORT_ADDR_FAMILY.IPV6.api]: + raise CallError(f"Unsupported addr_adrfam: {port['addr_adrfam']!r}") + if do_failover: + # HA get the non-VIP address (this works on MASTER too) + trtype = port['addr_trtype'] + if trtype not in choices: + choices[trtype] = await self.middleware.call('nvmet.port.transport_address_choices', trtype, True) + try: + pair = choices[trtype][port['addr_traddr']] + except KeyError: + continue + match node: + case 'A': + addresses.add(pair.split('/')[0]) + case 'B': + addresses.add(pair.split('/')[1]) + else: + # Not HA, just use whatever address is in the config + addresses.add(port['addr_traddr']) + + if not addresses: + raise CallError("No IP addresses configured for NVMe-oF") + + # Now query the interfaces to discover which ones are being used + nics = [] + iface_filter = [['OR', [ + ['state.aliases.*.address', 'in', addresses], + ['state.failover_virtual_aliases.*.address', 'in', addresses] + ]]] + interfaces = await self.middleware.call('interface.query', iface_filter) + for address in addresses: + name = self._address_to_iface_name(address, interfaces) + if not name: + raise CallError(f"Could not find interface for address: {address}") + nics.append(name) + + return nics + + def _address_to_iface_name(self, address, interfaces): + for iface in interfaces: + for alias in iface.get('state', {}).get('aliases', []): + if alias.get('address') == address: + return iface['name'] + for alias in iface.get('state', {}).get('failover_virtual_aliases', []): + if alias.get('address') == address: + return iface['name'] + + def nvmf_ready(self, cheap=False): + return nvmf_ready(cheap) + + async def wait_nvmf_ready(self, retries=10): + while retries > 0: + if await self.middleware.call('nvmet.spdk.nvmf_ready'): + return True + await asyncio.sleep(1) + retries -= 1 + return False diff --git a/src/middlewared/middlewared/plugins/service_/services/all.py b/src/middlewared/middlewared/plugins/service_/services/all.py index 46a91f945607f..6556539824e18 100644 --- a/src/middlewared/middlewared/plugins/service_/services/all.py +++ b/src/middlewared/middlewared/plugins/service_/services/all.py @@ -30,6 +30,7 @@ NfsMountdService, NtpdService, NVMETargetService, + NVMfService, OpenVmToolsService, PowerdService, RcService, @@ -73,6 +74,7 @@ NfsMountdService, NtpdService, NVMETargetService, + NVMfService, PowerdService, RcService, ResolvConfService, diff --git a/src/middlewared/middlewared/plugins/service_/services/pseudo/misc.py b/src/middlewared/middlewared/plugins/service_/services/pseudo/misc.py index 72b7e1fcc56c0..bc32765048578 100644 --- a/src/middlewared/middlewared/plugins/service_/services/pseudo/misc.py +++ b/src/middlewared/middlewared/plugins/service_/services/pseudo/misc.py @@ -223,3 +223,17 @@ async def get_state(self): (await self.middleware.call('nvmet.global.running')), [], ) + + async def failure_logs(self): + if (await self.middleware.call('nvmet.global.config'))['kernel']: + return None + else: + service_object = await self.middleware.call('service.object', "nvmf") + return await service_object.failure_logs() + + +class NVMfService(SimpleService): + name = "nvmf" + reloadable = True + etc = ["nvmet"] + systemd_unit = "ix-nvmf" diff --git a/src/middlewared/middlewared/plugins/system/info.py b/src/middlewared/middlewared/plugins/system/info.py index fdcf69196fe07..a6b7f4e96640b 100644 --- a/src/middlewared/middlewared/plugins/system/info.py +++ b/src/middlewared/middlewared/plugins/system/info.py @@ -9,7 +9,7 @@ from middlewared.api.current import SystemHostIdArgs, SystemHostIdResult, SystemInfoArgs, SystemInfoResult from middlewared.service import no_authz_required, private, Service from middlewared.utils import sw_buildtime -from middlewared.utils.cpu import cpu_info +from middlewared.utils.cpu import cpu_info, cpu_flags class SystemService(Service): @@ -39,6 +39,13 @@ async def cpu_info(self): we've not accounted for.""" return cpu_info() + @private + async def cpu_flags(self): + """CPU info could change after boot, but we + cache it since hot-plugging cpus is something + we've not accounted for.""" + return cpu_flags() + @private async def time_info(self): uptime_seconds = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) diff --git a/src/middlewared/middlewared/utils/cpu.py b/src/middlewared/middlewared/utils/cpu.py index 5e61c6368319a..0b5d3e9f471e5 100644 --- a/src/middlewared/middlewared/utils/cpu.py +++ b/src/middlewared/middlewared/utils/cpu.py @@ -31,6 +31,9 @@ class CpuInfo(typing.TypedDict): (i.e. {"cpu0": "cpu8", "cpu1": "cpu9"})""" +CpuFlags = list[str] + + @functools.cache def cpu_info() -> CpuInfo: return cpu_info_impl() @@ -137,3 +140,18 @@ def amd_cpu_temperatures(amd_metrics: dict) -> dict: return dict(enumerate([amd_sensors['temp1']] * core_count)) elif 'temp1_input' in amd_sensors['temp1']: return dict(enumerate([amd_sensors['temp1']['temp1_input']] * core_count)) + + +@functools.cache +def cpu_flags() -> dict[int, CpuFlags]: + result = {} + with open('/proc/cpuinfo', 'rb') as f: + for line in filter(lambda x: x.startswith((b'processor', b'flags')), f): + parts = line.decode('utf-8').split(':', 1) + title = parts[0].strip() + match title: + case 'processor': + cpu_number = int(parts[1].strip()) + case 'flags': + result[cpu_number] = parts[1].strip().split() + return result diff --git a/src/middlewared/middlewared/utils/nvmet/__init__.py b/src/middlewared/middlewared/utils/nvmet/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/middlewared/middlewared/plugins/nvmet/kernel.py b/src/middlewared/middlewared/utils/nvmet/kernel.py similarity index 89% rename from src/middlewared/middlewared/plugins/nvmet/kernel.py rename to src/middlewared/middlewared/utils/nvmet/kernel.py index 5a9a0167c8c5c..e0fe2e506d7d0 100644 --- a/src/middlewared/middlewared/plugins/nvmet/kernel.py +++ b/src/middlewared/middlewared/utils/nvmet/kernel.py @@ -5,19 +5,24 @@ from collections import defaultdict from contextlib import contextmanager +from middlewared.plugins.nvmet.constants import (DHCHAP_DHGROUP, + DHCHAP_HASH, + NAMESPACE_DEVICE_TYPE, + NVMET_KERNEL_CONFIG_DIR, + PORT_ADDR_FAMILY, + PORT_TRTYPE) from middlewared.plugins.zfs_.utils import zvol_name_to_path -from .constants import (DHCHAP_DHGROUP, - DHCHAP_HASH, - NAMESPACE_DEVICE_TYPE, - NVMET_KERNEL_CONFIG_DIR, - NVMET_NODE_A_ANA_GRPID, - NVMET_NODE_B_ANA_GRPID, - PORT_ADDR_FAMILY, - PORT_TRTYPE) - -ANA_OPTIMIZED_STATE = 'optimized' -ANA_INACCESSIBLE_STATE = 'inaccessible' -ANA_PORT_INDEX_OFFSET = 5000 +from .render_common import (ANA_PORT_INDEX_OFFSET, + NVMET_DEFAULT_ANA_GRPID, + NVMET_NODE_A_ANA_GRPID, + NVMET_NODE_A_MAX_CONTROLLER_ID, + NVMET_NODE_B_ANA_GRPID, + NVMET_NODE_B_MIN_CONTROLLER_ID, + addr_traddr_to_address, + ana_grpid, + ana_state, + port_subsys_index, + subsys_ana) class NvmetConfig: @@ -150,16 +155,9 @@ def map_attrs(self, attrs: dict, render_ctx: dict): case 'addr_adrfam': result[k] = PORT_ADDR_FAMILY.by_api(v).sysfs case 'addr_traddr': - result[k] = v - if attrs.get('index', 0) > ANA_PORT_INDEX_OFFSET: - prefix = attrs['addr_trtype'].lower() - choices = render_ctx[f'{prefix}.nvmet.port.transport_address_choices'] - pair = choices[v].split('/') - match render_ctx['failover.node']: - case 'A': - result[k] = pair[0] - case 'B': - result[k] = pair[1] + result[k] = addr_traddr_to_address(attrs.get('index', 0), + attrs['addr_trtype'], + v, render_ctx) case 'addr_trsvcid': result[k] = v case 'inline_data_size': @@ -190,7 +188,7 @@ def ensure_ana_state(self, path: pathlib.Path, render_ctx: dict): ana_path.mkdir(exist_ok=True) ana_state_path = pathlib.Path(ana_path, 'ana_state') cur_state = ana_state_path.read_text().strip() - new_state = ANA_OPTIMIZED_STATE if render_ctx['failover.status'] == 'MASTER' else ANA_INACCESSIBLE_STATE + new_state = ana_state(render_ctx) if cur_state != new_state: ana_state_path.write_text(f'{new_state}\n') else: @@ -399,9 +397,9 @@ def map_attrs(self, attrs: dict, render_ctx: dict): # Perhaps inject some values match render_ctx['failover.node']: case 'A': - result['attr_cntlid_max'] = 31999 + result['attr_cntlid_max'] = NVMET_NODE_A_MAX_CONTROLLER_ID case 'B': - result['attr_cntlid_min'] = 32000 + result['attr_cntlid_min'] = NVMET_NODE_B_MIN_CONTROLLER_ID result['attr_model'] = render_ctx['nvmet.subsys.model'] result['attr_firmware'] = render_ctx['nvmet.subsys.firmware'] @@ -480,23 +478,11 @@ def map_attrs(self, attrs: dict, render_ctx: dict): result['buffered_io'] = NAMESPACE_DEVICE_TYPE.by_api(attrs['device_type']).sysfs result['resv_enable'] = 1 - result['ana_grpid'] = 1 + result['ana_grpid'] = NVMET_DEFAULT_ANA_GRPID - do_ana = False # Is ANA active for this namespace (subsystem) - if render_ctx['nvmet.global.ana_active']: - # Maybe ANA applies to this namespace - if isinstance(subsys_ana := attrs['subsys']['ana'], bool): - do_ana = subsys_ana - else: - do_ana = bool(render_ctx['nvmet.global.ana_enabled']) - - if do_ana: - match render_ctx['failover.node']: - case 'A': - result['ana_grpid'] = NVMET_NODE_A_ANA_GRPID - case 'B': - result['ana_grpid'] = NVMET_NODE_B_ANA_GRPID + if subsys_ana(attrs['subsys'], render_ctx): + result['ana_grpid'] = ana_grpid(render_ctx) match render_ctx['failover.status']: case 'SINGLE' | 'MASTER': @@ -591,28 +577,8 @@ class NvmetPortSubsysConfig(NvmetLinkConfig): dst_query_keys = ['subsys', 'subnqn'] def src_dir_name(self, entry, render_ctx: dict): - # Because we have elected to support overriding the global ANA - # setting for individual subsystems this has two knock-on effects - # 1. Additional ANA-specific port indexes are in injected - # 2. Particular subsystems will link to either the ANA or non-ANA - # port index. - # However, if we're on the standby node we never want to setup - # a link to the VIP port. - raw_index = entry[self.src_query_keys[0]][self.src_query_keys[1]] - # Now check whether ANA is playing a part. - match entry['subsys']['ana']: - case True: - index = raw_index + ANA_PORT_INDEX_OFFSET - case False: - index = raw_index - case _: - if render_ctx['nvmet.global.ana_enabled']: - index = raw_index + ANA_PORT_INDEX_OFFSET - else: - index = raw_index - - if index < ANA_PORT_INDEX_OFFSET and render_ctx['failover.status'] == 'BACKUP': - return None + if (index := port_subsys_index(entry, render_ctx)) is None: + return index return str(index) diff --git a/src/middlewared/middlewared/utils/nvmet/render_common.py b/src/middlewared/middlewared/utils/nvmet/render_common.py new file mode 100644 index 0000000000000..b88683336b849 --- /dev/null +++ b/src/middlewared/middlewared/utils/nvmet/render_common.py @@ -0,0 +1,94 @@ +ANA_OPTIMIZED_STATE = 'optimized' +ANA_INACCESSIBLE_STATE = 'inaccessible' +ANA_PORT_INDEX_OFFSET = 5000 + +NVMET_DEFAULT_ANA_GRPID = 1 +NVMET_NODE_A_ANA_GRPID = 2 +NVMET_NODE_B_ANA_GRPID = 3 + +NVMET_NODE_A_MAX_CONTROLLER_ID = 31999 +NVMET_NODE_B_MIN_CONTROLLER_ID = 32000 + + +def addr_traddr_to_address(index, addr_trtype, addr_traddr, render_ctx): + result = addr_traddr + if index > ANA_PORT_INDEX_OFFSET: + choices = render_ctx[f'{addr_trtype.lower()}.nvmet.port.transport_address_choices'] + pair = choices[addr_traddr].split('/') + match render_ctx['failover.node']: + case 'A': + result = pair[0] + case 'B': + result = pair[1] + return result + + +def port_subsys_index(entry, render_ctx) -> int | None: + # Because we have elected to support overriding the global ANA + # setting for individual subsystems this has two knock-on effects + # 1. Additional ANA-specific port indexes are injected + # 2. Particular subsystems will link to either the ANA or non-ANA + # port index. + # However, if we're on the standby node we never want to setup + # a link to the VIP port. + raw_index = entry['port']['index'] + # Now check whether ANA is playing a part. + match entry['subsys']['ana']: + case True: + index = raw_index + ANA_PORT_INDEX_OFFSET + case False: + index = raw_index + case _: + if render_ctx['nvmet.global.ana_enabled']: + index = raw_index + ANA_PORT_INDEX_OFFSET + else: + index = raw_index + if index < ANA_PORT_INDEX_OFFSET and render_ctx['failover.status'] == 'BACKUP': + return None + return index + + +def ana_state(render_ctx): + return ANA_OPTIMIZED_STATE if render_ctx['failover.status'] == 'MASTER' else ANA_INACCESSIBLE_STATE + + +def ana_grpid(render_ctx): + match render_ctx['failover.node']: + case 'A': + return NVMET_NODE_A_ANA_GRPID + case 'B': + return NVMET_NODE_B_ANA_GRPID + case _: + return NVMET_DEFAULT_ANA_GRPID + + +def subsys_ana(subsys, render_ctx) -> bool: + if not render_ctx['failover.licensed']: + return False + + if render_ctx['nvmet.global.ana_enabled']: + if subsys['ana'] is False: + return False + return True + else: + if subsys['ana']: + return True + return False + + +def subsys_visible(subsys, render_ctx) -> bool: + match render_ctx['failover.status']: + case 'SINGLE' | 'MASTER': + return True + case 'BACKUP': + # Depends on the various ANA settings + match subsys['ana']: + case True: + return True + case False: + return False + case _: + if render_ctx['nvmet.global.ana_enabled']: + return True + else: + return False diff --git a/src/middlewared/middlewared/utils/nvmet/spdk.py b/src/middlewared/middlewared/utils/nvmet/spdk.py new file mode 100644 index 0000000000000..20b6639ed431b --- /dev/null +++ b/src/middlewared/middlewared/utils/nvmet/spdk.py @@ -0,0 +1,699 @@ +import copy +import hashlib +import os +import tempfile +from contextlib import contextmanager + +from spdk import rpc + +from middlewared.plugins.nvmet.constants import (NAMESPACE_DEVICE_TYPE, + NVMET_DISCOVERY_NQN, + PORT_ADDR_FAMILY) +from .render_common import (ANA_INACCESSIBLE_STATE, + ANA_OPTIMIZED_STATE, + ANA_PORT_INDEX_OFFSET, + NVMET_NODE_A_MAX_CONTROLLER_ID, + NVMET_NODE_B_MIN_CONTROLLER_ID, + addr_traddr_to_address, + ana_grpid, + ana_state, + port_subsys_index, + subsys_ana, + subsys_visible) + +SPDK_RPC_SERVER_ADDR = '/var/run/spdk/spdk.sock' +SPDK_RPC_PORT = 5260 +SPDK_RPC_TIMEOUT = None +SPDK_RPC_LOG_LEVEL = 'ERROR' +SPDK_RPC_CONN_RETRIES = 0 + +# Directory into which we will place our keys +SPDK_KEY_DIR = '/var/run/spdk/keys' + + +def host_config_key(config_item, key_type): + if key_value := config_item[key_type]: + md5_hash = hashlib.md5() + md5_hash.update(key_value.encode('utf-8')) + # Colon confuses things, replace + return f'{key_type}-{config_item["hostnqn"].replace(":", "-")}-{md5_hash.hexdigest()}' + + +def nvmf_ready(cheap=False): + if os.path.exists(SPDK_RPC_SERVER_ADDR): + if cheap: + return True + try: + client = make_client() + rpc.framework_wait_init(client) + return True + except Exception: + pass + return False + + +class NvmetConfig: + DEBUG = False + + def config_key(self, config_item, render_ctx): + return str(config_item[self.query_key]) + + def config_dict(self, render_ctx): + # Implement so that any class can skip entries by returning + # a config_key of None + result = {} + for entry in render_ctx[self.query]: + if (key := self.config_key(entry, render_ctx)) is not None: + result[key] = entry + return result + + def debug_title(self, title): + outstr = f'{title} ({self.__class__.__name__})' + print(outstr) + print('=' * len(outstr)) + + def debug(self, live, config): + if self.DEBUG: + import pprint + self.debug_title('LIVE') + pprint.pprint(live) + print() + + self.debug_title('CONFIG') + pprint.pprint(config) + print() + + @contextmanager + def render(self, client, render_ctx: dict): + live = self.get_live(client, render_ctx) + config = self.config_dict(render_ctx) + config_keys = set(config.keys()) + live_keys = set(live.keys()) + add_keys = config_keys - live_keys + remove_keys = live_keys - config_keys + remove_keys.discard(NVMET_DISCOVERY_NQN) + update_keys = config_keys - remove_keys - add_keys + + self.debug(live, config) + + for item in add_keys: + self.add(client, config[item], render_ctx) + + for item in update_keys: + self.update(client, config[item], live[item], render_ctx) + + yield + + for item in remove_keys: + self.delete(client, live[item], render_ctx) + + +class NvmetSubsysConfig(NvmetConfig): + query = 'nvmet.subsys.query' + query_key = 'subnqn' + + def config_key(self, config_item, render_ctx): + if subsys_visible(config_item, render_ctx): + return str(config_item[self.query_key]) + + def get_live(self, client, render_ctx): + return {subsys['nqn']: subsys for subsys in rpc.nvmf.nvmf_get_subsystems(client)} + + def add(self, client, config_item, render_ctx): + + kwargs = { + 'nqn': config_item['subnqn'], + 'serial_number': config_item['serial'], + 'allow_any_host': config_item['allow_any_host'], + 'model_number': render_ctx['nvmet.subsys.model'], + } + + # Perhaps inject some values + match render_ctx['failover.node']: + case 'A': + kwargs['max_cntlid'] = NVMET_NODE_A_MAX_CONTROLLER_ID + case 'B': + kwargs['min_cntlid'] = NVMET_NODE_B_MIN_CONTROLLER_ID + + if render_ctx['failover.licensed']: + kwargs['ana_reporting'] = True + + rpc.nvmf.nvmf_create_subsystem(client, **kwargs) + + def update(self, client, config_item, live_item, render_ctx): + if config_item['allow_any_host'] != live_item['allow_any_host']: + # The wrapper function inverts the parameter, so use NOT + rpc.nvmf.nvmf_subsystem_allow_any_host(client, config_item['subnqn'], not config_item['allow_any_host']) + + def delete(self, client, live_item, render_ctx): + rpc.nvmf.nvmf_delete_subsystem(client, nqn=live_item['nqn']) + + +class NvmetTransportConfig: + """ + There currently is no mechanism to unload transports, so in the render + we will simply add them if necessary. + """ + @contextmanager + def render(self, client, render_ctx: dict): + # Create a set of the transports demanded by the config + required = set([port['addr_trtype'] for port in render_ctx['nvmet.port.query']]) + current = set([transport['trtype'] for transport in rpc.nvmf.nvmf_get_transports(client)]) + for transport in required - current: + rpc.nvmf.nvmf_create_transport(client, trtype=transport) + yield + + +class NvmetPortConfig(NvmetConfig): + """ + SPDK doesn't have a seperate definition of ports, listeners are attached to + subsystems. We will model this by attaching listeners to the discovery port + for the port config, and use port_subsys for other subsystems. + """ + query = 'nvmet.port.query' + query_key = 'subnqn' + + def config_dict(self, render_ctx): + # For ports we may want to inject or remove ports wrt the ANA + # settings. ANA ports will be offset by ANA_PORT_INDEX_OFFSET (5000). + config = {} + non_ana_port_ids = render_ctx['nvmet.port.usage']['non_ana_port_ids'] + ana_port_ids = render_ctx['nvmet.port.usage']['ana_port_ids'] + for entry in render_ctx[self.query]: + port_id = entry['id'] + if port_id in non_ana_port_ids: + config[str(entry['index'])] = entry + if port_id in ana_port_ids: + new_index = ANA_PORT_INDEX_OFFSET + entry['index'] + config[str(new_index)] = entry | {'index': new_index} + return config + + def live_to_index(self, addr_trtype, addr_traddr, addr_trsvcid, render_ctx): + for entry in render_ctx['nvmet.port.query']: + if addr_trtype != entry['addr_trtype'] or str(addr_trsvcid) != str(entry['addr_trsvcid']): + continue + elif addr_traddr == entry['addr_traddr']: + return str(entry['index']) + elif addr_traddr == addr_traddr_to_address(entry['index'] + ANA_PORT_INDEX_OFFSET, + entry['addr_trtype'], + entry['addr_traddr'], + render_ctx): + return str(entry['index'] + ANA_PORT_INDEX_OFFSET) + + def live_address_to_key(self, laddr, render_ctx): + if index := self.live_to_index(laddr['trtype'], laddr['traddr'], laddr['trsvcid'], render_ctx): + return index + else: + match laddr['trtype']: + case 'RDMA' | 'TCP': + return f"{laddr['trtype']}:{laddr['traddr']}:{laddr['trsvcid']}" + case _: + # Keep a trailing colon here to simply logic that depends on split() + return f"{laddr['trtype']}:{laddr['traddr']}:" + + def config_key(self, config_item, render_ctx): + return config_item['index'] + + def get_live(self, client, render_ctx): + return {self.live_address_to_key(entry['address'], render_ctx): entry + for entry in rpc.nvmf.nvmf_subsystem_get_listeners(client, nqn=NVMET_DISCOVERY_NQN)} + + def add_to_nqn(self, client, config_item, nqn, render_ctx): + kwargs = { + 'nqn': nqn, + 'trtype': config_item['addr_trtype'], + 'adrfam': PORT_ADDR_FAMILY.by_api(config_item['addr_adrfam']).spdk, + 'traddr': addr_traddr_to_address(config_item['index'], + config_item['addr_trtype'], + config_item['addr_traddr'], + render_ctx), + 'trsvcid': str(config_item['addr_trsvcid']) + } + # The API will generate the listen_address from its constituents - hence flat here + rpc.nvmf.nvmf_subsystem_add_listener(client, **kwargs) + + if nqn != NVMET_DISCOVERY_NQN and config_item['index'] > ANA_PORT_INDEX_OFFSET: + kwargs['ana_state'] = ana_state(render_ctx) + kwargs.update({'anagrpid': ana_grpid(render_ctx)}) + rpc.nvmf.nvmf_subsystem_listener_set_ana_state(client, **kwargs) + + def add(self, client, config_item, render_ctx): + self.add_to_nqn(client, config_item, NVMET_DISCOVERY_NQN, render_ctx) + + def address_match(self, config_item, live_address): + if config_item['addr_trtype'] != live_address['trtype'] or \ + config_item['addr_traddr'] != live_address['traddr'] or \ + str(config_item['addr_trsvcid']) != live_address['trsvcid']: + return False + return True + + def update(self, client, config_item, live_item, render_ctx): + if not self.address_match(config_item, live_item['address']): + self.delete(client, live_item, render_ctx) + self.add(client, config_item, render_ctx) + + def delete_from_nqn(self, client, laddr, nqn, render_ctx): + kwargs = { + 'nqn': nqn, + } + kwargs.update(laddr) + rpc.nvmf.nvmf_subsystem_remove_listener(client, **kwargs) + + def delete(self, client, live_item, render_ctx): + self.delete_from_nqn(client, live_item['address'], NVMET_DISCOVERY_NQN, render_ctx) + + +class NvmetPortAnaReferralConfig(NvmetConfig): + """ + Referrals are substantially different in SPDK than in the kernel + implementation. They are global rather than attached to a + particular port. + + Therefore we will just add referrals for each ANA port to its peer. + """ + query = 'nvmet.port.query' + + def config_dict(self, render_ctx): + config = {} + # If not HA then no peer referrals + if render_ctx['failover.node'] not in ('A', 'B'): + return config + + # If ANA is enabled on a port then we want to add a referral to + # the peer port on the other node. + ana_port_ids = render_ctx['nvmet.port.usage']['ana_port_ids'] + for entry in render_ctx[self.query]: + port_id = entry['id'] + if port_id in ana_port_ids: + peer_addr = None + prefix = entry['addr_trtype'].lower() + choices = render_ctx[f'{prefix}.nvmet.port.transport_address_choices'] + try: + pair = choices[entry['addr_traddr']].split('/') + except KeyError: + continue + match render_ctx['failover.node']: + case 'A': + peer_addr = pair[1] + case 'B': + peer_addr = pair[0] + if peer_addr: + # Make the entry a valid listen address to simplify implementation of add() + config[f"{entry['addr_trtype']}:{peer_addr}:{entry['addr_trsvcid']}"] = { + 'trtype': entry['addr_trtype'], + 'adrfam': PORT_ADDR_FAMILY.by_api(entry['addr_adrfam']).spdk, + 'traddr': peer_addr, + 'trsvcid': str(entry['addr_trsvcid']), + } + return config + + def add(self, client, config_item, render_ctx): + rpc.nvmf.nvmf_discovery_add_referral(client, **config_item) + + def update(self, client, config_item, live_item, render_ctx): + # Update is a no-op because all the relevant data is in the key + pass + + def delete(self, client, live_item, render_ctx): + rpc.nvmf.nvmf_discovery_remove_referral(client, **live_item) + pass + + def get_live(self, client, render_ctx): + result = {} + for entry in rpc.nvmf.nvmf_discovery_get_referrals(client): + laddr = entry['address'] + result[f"{laddr['trtype']}:{laddr['traddr']}:{laddr['trsvcid']}"] = laddr + return result + + +class NvmetPortSubsysConfig(NvmetPortConfig): + query = 'nvmet.port_subsys.query' + + def config_dict(self, render_ctx): + # For ports we may want to inject or remove ports wrt the ANA + # settings. ANA ports will be offset by ANA_PORT_INDEX_OFFSET (5000). + # + # For the general port setting we could use nvmet.port.usage, but + # per subsystem we need to be more specific. + config = {} + for entry in render_ctx[self.query]: + subnqn = entry['subsys']['subnqn'] + if index := port_subsys_index(entry, render_ctx): + if index < ANA_PORT_INDEX_OFFSET: + config[f"{index}:{subnqn}"] = entry + else: + newentry = copy.deepcopy(entry) + newentry['port']['index'] = index + config[f"{index}:{subnqn}"] = newentry + return config + + def config_key(self, config_item, render_ctx): + return f"{super().config_key(config_item['port'], render_ctx)}:{config_item['subsys']['subnqn']}" + + def get_live(self, client, render_ctx): + result = {} + for subsys in rpc.nvmf.nvmf_get_subsystems(client): + if subsys['nqn'] == NVMET_DISCOVERY_NQN: + continue + for address in subsys['listen_addresses']: + port_key = self.live_address_to_key(address, render_ctx) + # Construct a synthetic live item that will facilitate delete when needed + result[f"{port_key}:{subsys['nqn']}"] = {'port': address, 'nqn': subsys['nqn']} + return result + + def add(self, client, config_item, render_ctx): + self.add_to_nqn(client, config_item['port'], config_item['subsys']['subnqn'], render_ctx) + + def update(self, client, config_item, live_item, render_ctx): + if not self.address_match(config_item['port'], live_item['port']): + self.delete(client, live_item, render_ctx) + self.add(client, config_item, render_ctx) + + def delete(self, client, live_item, render_ctx): + self.delete_from_nqn(client, live_item['port'], live_item['nqn'], render_ctx) + + +class NvmetKeyringDhchapKeyConfig(NvmetConfig): + """ + We may have configured dhchap_key or dhchap_ctrl_key for each host. + + Both will be derived from nvmet.host.query, but in different + classes - this one, and a subclass. + """ + query = 'nvmet.host.query' + key_type = 'dhchap_key' + + def config_key(self, config_item, render_ctx): + return host_config_key(config_item, self.key_type) + + def _write_keyfile(self, key): + with tempfile.NamedTemporaryFile(mode="w+", dir=SPDK_KEY_DIR, delete=False) as tmp_file: + tmp_file.write(key) + return tmp_file.name + + def get_live(self, client, render_ctx): + return {item['name']: item for item in rpc.keyring.keyring_get_keys(client) + if item['name'].startswith(f'{self.key_type}-')} + + def add(self, client, config_item, render_ctx): + kwargs = { + 'name': self.config_key(config_item, render_ctx), + 'path': self._write_keyfile(config_item[self.key_type]), + } + rpc.keyring.keyring_file_add_key(client, **kwargs) + + def update(self, client, config_item, live_item, render_ctx): + # Because the key contains a hash, we only need to handle add and remove. + pass + + def delete(self, client, live_item, render_ctx): + rpc.keyring.keyring_file_remove_key(client, name=live_item['name']) + os.unlink(live_item['path']) + + +class NvmetKeyringDhchapCtrlKeyConfig(NvmetKeyringDhchapKeyConfig): + query = 'nvmet.host.query' + key_type = 'dhchap_ctrl_key' + + +class NvmetHostSubsysConfig(NvmetConfig): + query = 'nvmet.host_subsys.query' + + def config_key(self, config_item, render_ctx): + + return f"{config_item['host']['hostnqn']}:{config_item['subsys']['subnqn']}" + + def get_live(self, client, render_ctx): + result = {} + for subsys in rpc.nvmf.nvmf_get_subsystems(client): + if subsys['nqn'] == NVMET_DISCOVERY_NQN: + continue + for host in subsys['hosts']: + hostnqn = host['nqn'] + # Yes, deliberately mapped live dhchap_ctrlr_key to dhchap_ctrl_key here to + # make comparison in the update method easier + result[f"{hostnqn}:{subsys['nqn']}"] = {'hostnqn': hostnqn, + 'nqn': subsys['nqn'], + 'dhchap_key': host.get('dhchap_key'), + 'dhchap_ctrl_key': host.get('dhchap_ctrlr_key'), + } + return result + + def add(self, client, config_item, render_ctx): + kwargs = { + 'nqn': config_item['subsys']['subnqn'], + 'host': config_item['host']['hostnqn'], + } + + if config_item['host']['dhchap_key']: + kwargs.update({'dhchap_key': host_config_key(config_item['host'], 'dhchap_key')}) + + if config_item['host']['dhchap_ctrl_key']: + # Yes, the SPDK name is different from the name in our config: + # dhchap_ctrlr_key vs dhchap_ctrl_key + kwargs.update({'dhchap_ctrlr_key': host_config_key(config_item['host'], 'dhchap_ctrl_key')}) + + # If this is a HA system then inject some settings + match render_ctx['failover.node']: + case 'A': + kwargs.update({'max_cntlid': NVMET_NODE_A_MAX_CONTROLLER_ID}) + case 'B': + kwargs.update({'min_cntlid': NVMET_NODE_B_MIN_CONTROLLER_ID}) + + rpc.nvmf.nvmf_subsystem_add_host(client, **kwargs) + + def update(self, client, config_item, live_item, render_ctx): + # We cannot update, so need to remove and reattach if the contents are wrong. + config_host = config_item['host'] + matches = True + for key_type in ('dhchap_key', 'dhchap_ctrl_key'): + if config_host[key_type] is None and live_item[key_type] is None: + continue + if config_host[key_type] is None or live_item[key_type] is None: + matches = False + break + if live_item[key_type] != host_config_key(config_host, key_type): + matches = False + break + + if matches: + return + + self.delete(client, live_item, render_ctx) + self.add(client, config_item, render_ctx) + + def delete(self, client, live_item, render_ctx): + kwargs = { + 'nqn': live_item['nqn'], + 'host': live_item['hostnqn'], + } + rpc.nvmf.nvmf_subsystem_remove_host(client, **kwargs) + + +class NvmetBdevConfig(NvmetConfig): + query = 'nvmet.namespace.query' + + def config_key(self, config_item, render_ctx): + if subsys_visible(config_item['subsys'], render_ctx): + if render_ctx['failover.status'] != 'BACKUP': + return f"{config_item['device_type']}:{config_item['device_path']}" + + def live_key(self, live_item): + match live_item['product_name']: + case 'URING bdev': + if filename := live_item.get('driver_specific', {}).get('uring', {}).get('filename'): + if filename.startswith('/dev/zvol/'): + return f'ZVOL:{filename[5:]}' + case 'AIO disk': + if filename := live_item.get('driver_specific', {}).get('aio', {}).get('filename'): + if filename.startswith('/mnt'): + return f'FILE:{filename}' + case 'Null disk': + return live_item['name'] + + def get_live(self, client, render_ctx): + result = {} + for entry in rpc.bdev.bdev_get_bdevs(client): + if key := self.live_key(entry): + result[key] = entry + return result + + def bdev_name(self, config_item, render_ctx): + # Skip is we're the BACKUP in a HA + if render_ctx['failover.status'] == 'BACKUP': + return + + match config_item['device_type']: + case NAMESPACE_DEVICE_TYPE.ZVOL.api: + return f"ZVOL:{config_item['device_path']}" + + case NAMESPACE_DEVICE_TYPE.FILE.api: + return f"FILE:{config_item['device_path']}" + + def add(self, client, config_item, render_ctx): + name = self.bdev_name(config_item, render_ctx) + if not name: + return + + if render_ctx['failover.status'] == 'BACKUP': + rpc.bdev.bdev_null_create(client, + block_size=4096, + num_blocks=1, + name=name + ) + return + + match config_item['device_type']: + case NAMESPACE_DEVICE_TYPE.ZVOL.api: + rpc.bdev.bdev_uring_create(client, + filename=f"/dev/{config_item['device_path']}", + name=name + ) + + case NAMESPACE_DEVICE_TYPE.FILE.api: + _path = config_item['device_path'] + rpc.bdev.bdev_aio_create(client, + filename=_path, + block_size=render_ctx.get('path_to_recordsize', {}).get(_path, 512), + name=name + ) + + def update(self, client, config_item, live_item, render_ctx): + pass + + def delete(self, client, live_item, render_ctx): + match live_item['product_name']: + case 'URING bdev': + rpc.bdev.bdev_uring_delete(client, name=live_item['name']) + + case 'AIO disk': + rpc.bdev.bdev_aio_delete(client, name=live_item['name']) + + case 'Null disk': + rpc.bdev.bdev_null_delete(client, name=live_item['name']) + + +class NvmetNamespaceConfig(NvmetBdevConfig): + query = 'nvmet.namespace.query' + + def config_key(self, config_item, render_ctx): + if subsys_visible(config_item['subsys'], render_ctx): + name = self.bdev_name(config_item, render_ctx) + return f"{name}:{config_item['subsys']['subnqn']}:{config_item['nsid']}" + + def get_live(self, client, render_ctx): + result = {} + for subsys in rpc.nvmf.nvmf_get_subsystems(client): + _nqn = subsys['nqn'] + for ns in subsys.get('namespaces', []): + _nsid = ns['nsid'] + key = f"{ns['bdev_name']}:{_nqn}:{_nsid}" + result[key] = {'nqn': _nqn, 'nsid': _nsid} + return result + + def add(self, client, config_item, render_ctx): + name = self.bdev_name(config_item, render_ctx) + if not name: + return + + # If HA always use the per-node anagrpid, just to save + # having to toggle it later if we toggle ANA + kwargs = { + 'nqn': config_item['subsys']['subnqn'], + 'bdev_name': name, + 'uuid': config_item['device_uuid'], + 'nguid': config_item['device_nguid'].replace('-', ''), + 'anagrpid': ana_grpid(render_ctx), + } + if nsid := config_item.get('nsid'): + kwargs.update({'nsid': nsid}) + + rpc.nvmf.nvmf_subsystem_add_ns(client, **kwargs) + + def delete(self, client, live_item, render_ctx): + kwargs = { + 'nqn': live_item['nqn'], + 'nsid': live_item['nsid'] + } + rpc.nvmf.nvmf_subsystem_remove_ns(client, **kwargs) + + +def make_client(): + return rpc.client.JSONRPCClient(SPDK_RPC_SERVER_ADDR, + SPDK_RPC_PORT, + SPDK_RPC_TIMEOUT, + log_level=SPDK_RPC_LOG_LEVEL, + conn_retries=SPDK_RPC_CONN_RETRIES) + + +def nvmf_subsystem_get_qpairs(client, nqn): + return rpc.nvmf.nvmf_subsystem_get_qpairs(client, nqn=nqn) + + +class NvmetAnaStateConfig: + + @contextmanager + def render(self, client, render_ctx: dict): + """ + If we are making things inaccessible then do this before the yield, + otherwise after the yield. + """ + new_state = ana_state(render_ctx) + _anagrpid = ana_grpid(render_ctx) + + updates = [] + for subsys in render_ctx['nvmet.subsys.query']: + if subsys_ana(subsys, render_ctx): + nqn = subsys['subnqn'] + for listener in rpc.nvmf.nvmf_subsystem_get_listeners(client, nqn): + if cur_state := next(filter(lambda x: x['ana_group'] == _anagrpid, listener['ana_states']), None): + if cur_state['ana_state'] != new_state: + kwargs = { + 'nqn': nqn, + 'ana_state': new_state, + 'anagrpid': _anagrpid + } + kwargs.update(listener['address']) + updates.append(kwargs) + + if new_state == ANA_INACCESSIBLE_STATE: + for kwargs in updates: + rpc.nvmf.nvmf_subsystem_listener_set_ana_state(client, **kwargs) + + yield + + if new_state == ANA_OPTIMIZED_STATE: + for kwargs in updates: + rpc.nvmf.nvmf_subsystem_listener_set_ana_state(client, **kwargs) + + +def write_config(config): + client = make_client() + + if not os.path.isdir(SPDK_KEY_DIR): + os.mkdir(SPDK_KEY_DIR) + + # Render operations are context managers that do + # 1. Create-style operations + # 2. yield + # 3. Delete-style operations + # + # Therefore we can nest them to enfore the necessary + # order of operations. + # + # SPDK automatically does what we had called cross-port + # referrals in the kernel implementation. + with ( + NvmetSubsysConfig().render(client, config), + NvmetTransportConfig().render(client, config), + NvmetKeyringDhchapKeyConfig().render(client, config), + NvmetKeyringDhchapCtrlKeyConfig().render(client, config), + NvmetPortConfig().render(client, config), + NvmetPortAnaReferralConfig().render(client, config), + NvmetHostSubsysConfig().render(client, config), + NvmetPortSubsysConfig().render(client, config), + NvmetAnaStateConfig().render(client, config), + NvmetBdevConfig().render(client, config), + NvmetNamespaceConfig().render(client, config), + ): + pass