From f70eccb89a7c711be98a3947bb2ef8c81ef5ae50 Mon Sep 17 00:00:00 2001 From: Ali Hamdan Date: Mon, 1 Jan 2024 22:06:07 +0100 Subject: [PATCH] MacOS improvements (#12) --- lsports/_macos.py | 58 ++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/lsports/_macos.py b/lsports/_macos.py index e5ccb11..c0deb3d 100644 --- a/lsports/_macos.py +++ b/lsports/_macos.py @@ -4,6 +4,7 @@ from __future__ import annotations import ctypes +from collections.abc import Iterable, Iterator from ctypes import byref, create_string_buffer from lsports._common import PortInfo @@ -43,7 +44,7 @@ iokit.IOServiceGetMatchingServices.restype = kern_return_t iokit.IORegistryEntryGetParentEntry.argtypes = (ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p) -iokit.IOServiceGetMatchingServices.restype = kern_return_t +iokit.IORegistryEntryGetParentEntry.restype = kern_return_t iokit.IORegistryEntryCreateCFProperty.argtypes = ( ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint32 @@ -187,64 +188,60 @@ def GetParentDeviceByType(device: ctypes._CData, parent_type: bytes) -> ctypes._ return device -def GetIOServicesByType(service_type: bytes) -> list[ctypes._CData]: +def GetIOServicesByType(service_type: bytes) -> Iterator[ctypes._CData]: """List specified ``service_type``.""" serial_port_iterator = ctypes.c_void_p() iokit.IOServiceGetMatchingServices( kIOMasterPortDefault, iokit.IOServiceMatching(service_type), byref(serial_port_iterator) ) - services = [] while iokit.IOIteratorIsValid(serial_port_iterator): service = iokit.IOIteratorNext(serial_port_iterator) if not service: break - services.append(service) + yield service iokit.IOObjectRelease(serial_port_iterator) - return services -def location_to_string(locationID: int) -> str: +def location_to_string(location_id: int) -> str: """Helper to calculate port and bus number from locationID.""" - loc = [f"{locationID >> 24}-"] - while locationID & 0xF00000: + loc = [f"{location_id >> 24}-"] + while location_id & 0xF00000: if len(loc) > 1: loc.append(".") - loc.append(f"{(locationID >> 20) & 0xf}") - locationID <<= 4 + loc.append(f"{(location_id >> 20) & 0xf}") + location_id <<= 4 return "".join(loc) -class SuitableSerialInterface: +class Interface: + """A class to hold ID and name of serial interfaces.""" + def __init__(self, id: int | None, name: str | None) -> None: self.id = id self.name = name -def scan_interfaces() -> list[SuitableSerialInterface]: - """Helper function to scan USB interfaces. - - Returns: - A list of SuitableSerialInterface objects with name and id attributes. - """ +def scan_interfaces() -> tuple[list[ctypes._CData], list[Interface]]: + """Get the list of services and the list of interface names and ids.""" + services = [] interfaces = [] for service in GetIOServicesByType(b"IOSerialBSDClient"): + services.append(service) device = get_string_property(service, b"IOCalloutDevice") if device: usb_device = GetParentDeviceByType(service, b"IOUSBInterface") if usb_device: name = get_string_property(usb_device, b"USB Interface Name") or None - locationID = ( + location_id = ( get_int_property(usb_device, b"locationID", kCFNumberSInt32Type) or None ) - interfaces.append(SuitableSerialInterface(locationID, name)) - return interfaces + interfaces.append(Interface(location_id, name)) + return services, interfaces -def search_for_locationID_in_interfaces( - serial_interfaces: list[SuitableSerialInterface], locationID: int -) -> str | None: - for interface in serial_interfaces: - if interface.id == locationID: +def get_interface_from_location(interfaces: Iterable[Interface], location_id: int) -> str | None: + for interface in interfaces: + if interface.id == location_id: return interface.name return None @@ -252,9 +249,8 @@ def search_for_locationID_in_interfaces( def comports(include_links: bool = False) -> list[PortInfo]: # include_links is currently ignored. Are links in /dev even supported here? # Scan for all iokit serial ports - services = GetIOServicesByType(b"IOSerialBSDClient") + services, serial_interfaces = scan_interfaces() ports = [] - serial_interfaces = scan_interfaces() for service in services: # First, add the callout device file. device = get_string_property(service, b"IOCalloutDevice") @@ -273,10 +269,10 @@ def comports(include_links: bool = False) -> list[PortInfo]: # to the usb product name string descriptor. info.product = IORegistryEntryGetName(usb_device) or "n/a" info.manufacturer = get_string_property(usb_device, kUSBVendorString) - locationID = get_int_property(usb_device, b"locationID", kCFNumberSInt32Type) - assert locationID is not None - info.location = location_to_string(locationID) - info.interface = search_for_locationID_in_interfaces(serial_interfaces, locationID) + location_id = get_int_property(usb_device, b"locationID", kCFNumberSInt32Type) + assert location_id is not None + info.location = location_to_string(location_id) + info.interface = get_interface_from_location(serial_interfaces, location_id) info.apply_usb_info() ports.append(info) return ports