Skip to content

Commit

Permalink
MacOS improvements (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
hamdanal committed Jan 1, 2024
1 parent 654a3b3 commit f70eccb
Showing 1 changed file with 27 additions and 31 deletions.
58 changes: 27 additions & 31 deletions lsports/_macos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import ctypes
from collections.abc import Iterable, Iterator
from ctypes import byref, create_string_buffer

from lsports._common import PortInfo
Expand Down Expand Up @@ -43,7 +44,7 @@
iokit.IOServiceGetMatchingServices.restype = kern_return_t

iokit.IORegistryEntryGetParentEntry.argtypes = (ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p)
iokit.IOServiceGetMatchingServices.restype = kern_return_t
iokit.IORegistryEntryGetParentEntry.restype = kern_return_t

iokit.IORegistryEntryCreateCFProperty.argtypes = (
ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint32
Expand Down Expand Up @@ -187,74 +188,69 @@ def GetParentDeviceByType(device: ctypes._CData, parent_type: bytes) -> ctypes._
return device


def GetIOServicesByType(service_type: bytes) -> list[ctypes._CData]:
def GetIOServicesByType(service_type: bytes) -> Iterator[ctypes._CData]:
"""List specified ``service_type``."""
serial_port_iterator = ctypes.c_void_p()
iokit.IOServiceGetMatchingServices(
kIOMasterPortDefault, iokit.IOServiceMatching(service_type), byref(serial_port_iterator)
)
services = []
while iokit.IOIteratorIsValid(serial_port_iterator):
service = iokit.IOIteratorNext(serial_port_iterator)
if not service:
break
services.append(service)
yield service
iokit.IOObjectRelease(serial_port_iterator)
return services


def location_to_string(locationID: int) -> str:
def location_to_string(location_id: int) -> str:
"""Helper to calculate port and bus number from locationID."""
loc = [f"{locationID >> 24}-"]
while locationID & 0xF00000:
loc = [f"{location_id >> 24}-"]
while location_id & 0xF00000:
if len(loc) > 1:
loc.append(".")
loc.append(f"{(locationID >> 20) & 0xf}")
locationID <<= 4
loc.append(f"{(location_id >> 20) & 0xf}")
location_id <<= 4
return "".join(loc)


class SuitableSerialInterface:
class Interface:
"""A class to hold ID and name of serial interfaces."""

def __init__(self, id: int | None, name: str | None) -> None:
self.id = id
self.name = name


def scan_interfaces() -> list[SuitableSerialInterface]:
"""Helper function to scan USB interfaces.
Returns:
A list of SuitableSerialInterface objects with name and id attributes.
"""
def scan_interfaces() -> tuple[list[ctypes._CData], list[Interface]]:
"""Get the list of services and the list of interface names and ids."""
services = []
interfaces = []
for service in GetIOServicesByType(b"IOSerialBSDClient"):
services.append(service)
device = get_string_property(service, b"IOCalloutDevice")
if device:
usb_device = GetParentDeviceByType(service, b"IOUSBInterface")
if usb_device:
name = get_string_property(usb_device, b"USB Interface Name") or None
locationID = (
location_id = (
get_int_property(usb_device, b"locationID", kCFNumberSInt32Type) or None
)
interfaces.append(SuitableSerialInterface(locationID, name))
return interfaces
interfaces.append(Interface(location_id, name))
return services, interfaces


def search_for_locationID_in_interfaces(
serial_interfaces: list[SuitableSerialInterface], locationID: int
) -> str | None:
for interface in serial_interfaces:
if interface.id == locationID:
def get_interface_from_location(interfaces: Iterable[Interface], location_id: int) -> str | None:
for interface in interfaces:
if interface.id == location_id:
return interface.name
return None


def comports(include_links: bool = False) -> list[PortInfo]:
# include_links is currently ignored. Are links in /dev even supported here?
# Scan for all iokit serial ports
services = GetIOServicesByType(b"IOSerialBSDClient")
services, serial_interfaces = scan_interfaces()
ports = []
serial_interfaces = scan_interfaces()
for service in services:
# First, add the callout device file.
device = get_string_property(service, b"IOCalloutDevice")
Expand All @@ -273,10 +269,10 @@ def comports(include_links: bool = False) -> list[PortInfo]:
# to the usb product name string descriptor.
info.product = IORegistryEntryGetName(usb_device) or "n/a"
info.manufacturer = get_string_property(usb_device, kUSBVendorString)
locationID = get_int_property(usb_device, b"locationID", kCFNumberSInt32Type)
assert locationID is not None
info.location = location_to_string(locationID)
info.interface = search_for_locationID_in_interfaces(serial_interfaces, locationID)
location_id = get_int_property(usb_device, b"locationID", kCFNumberSInt32Type)
assert location_id is not None
info.location = location_to_string(location_id)
info.interface = get_interface_from_location(serial_interfaces, location_id)
info.apply_usb_info()
ports.append(info)
return ports

0 comments on commit f70eccb

Please sign in to comment.