Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MacOS improvements #12

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions lsports/_macos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import ctypes
from collections.abc import Iterable, Iterator
from ctypes import byref, create_string_buffer

from lsports._common import PortInfo
Expand Down Expand Up @@ -43,7 +44,7 @@
iokit.IOServiceGetMatchingServices.restype = kern_return_t

iokit.IORegistryEntryGetParentEntry.argtypes = (ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p)
iokit.IOServiceGetMatchingServices.restype = kern_return_t
iokit.IORegistryEntryGetParentEntry.restype = kern_return_t

iokit.IORegistryEntryCreateCFProperty.argtypes = (
ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint32
Expand Down Expand Up @@ -187,74 +188,69 @@ def GetParentDeviceByType(device: ctypes._CData, parent_type: bytes) -> ctypes._
return device


def GetIOServicesByType(service_type: bytes) -> list[ctypes._CData]:
def GetIOServicesByType(service_type: bytes) -> Iterator[ctypes._CData]:
"""List specified ``service_type``."""
serial_port_iterator = ctypes.c_void_p()
iokit.IOServiceGetMatchingServices(
kIOMasterPortDefault, iokit.IOServiceMatching(service_type), byref(serial_port_iterator)
)
services = []
while iokit.IOIteratorIsValid(serial_port_iterator):
service = iokit.IOIteratorNext(serial_port_iterator)
if not service:
break
services.append(service)
yield service
iokit.IOObjectRelease(serial_port_iterator)
return services


def location_to_string(locationID: int) -> str:
def location_to_string(location_id: int) -> str:
"""Helper to calculate port and bus number from locationID."""
loc = [f"{locationID >> 24}-"]
while locationID & 0xF00000:
loc = [f"{location_id >> 24}-"]
while location_id & 0xF00000:
if len(loc) > 1:
loc.append(".")
loc.append(f"{(locationID >> 20) & 0xf}")
locationID <<= 4
loc.append(f"{(location_id >> 20) & 0xf}")
location_id <<= 4
return "".join(loc)


class SuitableSerialInterface:
class Interface:
"""A class to hold ID and name of serial interfaces."""

def __init__(self, id: int | None, name: str | None) -> None:
self.id = id
self.name = name


def scan_interfaces() -> list[SuitableSerialInterface]:
"""Helper function to scan USB interfaces.

Returns:
A list of SuitableSerialInterface objects with name and id attributes.
"""
def scan_interfaces() -> tuple[list[ctypes._CData], list[Interface]]:
"""Get the list of services and the list of interface names and ids."""
services = []
interfaces = []
for service in GetIOServicesByType(b"IOSerialBSDClient"):
services.append(service)
device = get_string_property(service, b"IOCalloutDevice")
if device:
usb_device = GetParentDeviceByType(service, b"IOUSBInterface")
if usb_device:
name = get_string_property(usb_device, b"USB Interface Name") or None
locationID = (
location_id = (
get_int_property(usb_device, b"locationID", kCFNumberSInt32Type) or None
)
interfaces.append(SuitableSerialInterface(locationID, name))
return interfaces
interfaces.append(Interface(location_id, name))
return services, interfaces


def search_for_locationID_in_interfaces(
serial_interfaces: list[SuitableSerialInterface], locationID: int
) -> str | None:
for interface in serial_interfaces:
if interface.id == locationID:
def get_interface_from_location(interfaces: Iterable[Interface], location_id: int) -> str | None:
for interface in interfaces:
if interface.id == location_id:
return interface.name
return None


def comports(include_links: bool = False) -> list[PortInfo]:
# include_links is currently ignored. Are links in /dev even supported here?
# Scan for all iokit serial ports
services = GetIOServicesByType(b"IOSerialBSDClient")
services, serial_interfaces = scan_interfaces()
ports = []
serial_interfaces = scan_interfaces()
for service in services:
# First, add the callout device file.
device = get_string_property(service, b"IOCalloutDevice")
Expand All @@ -273,10 +269,10 @@ def comports(include_links: bool = False) -> list[PortInfo]:
# to the usb product name string descriptor.
info.product = IORegistryEntryGetName(usb_device) or "n/a"
info.manufacturer = get_string_property(usb_device, kUSBVendorString)
locationID = get_int_property(usb_device, b"locationID", kCFNumberSInt32Type)
assert locationID is not None
info.location = location_to_string(locationID)
info.interface = search_for_locationID_in_interfaces(serial_interfaces, locationID)
location_id = get_int_property(usb_device, b"locationID", kCFNumberSInt32Type)
assert location_id is not None
info.location = location_to_string(location_id)
info.interface = get_interface_from_location(serial_interfaces, location_id)
info.apply_usb_info()
ports.append(info)
return ports