diff --git a/homeassistant/components/homeassistant_connect_zbt2/config_flow.py b/homeassistant/components/homeassistant_connect_zbt2/config_flow.py index 1d95601211e..ca5a4ae307e 100644 --- a/homeassistant/components/homeassistant_connect_zbt2/config_flow.py +++ b/homeassistant/components/homeassistant_connect_zbt2/config_flow.py @@ -7,11 +7,18 @@ from typing import TYPE_CHECKING, Any, Protocol from homeassistant.components import usb from homeassistant.components.homeassistant_hardware import firmware_config_flow +from homeassistant.components.homeassistant_hardware.helpers import ( + HardwareFirmwareDiscoveryInfo, +) from homeassistant.components.homeassistant_hardware.util import ( ApplicationType, FirmwareInfo, ResetTarget, ) +from homeassistant.components.usb import ( + usb_service_info_from_device, + usb_unique_id_from_service_info, +) from homeassistant.config_entries import ( ConfigEntry, ConfigEntryBaseFlow, @@ -123,22 +130,16 @@ class HomeAssistantConnectZBT2ConfigFlow( async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult: """Handle usb discovery.""" - device = discovery_info.device - vid = discovery_info.vid - pid = discovery_info.pid - serial_number = discovery_info.serial_number - manufacturer = discovery_info.manufacturer - description = discovery_info.description - unique_id = f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}" + unique_id = usb_unique_id_from_service_info(discovery_info) - device = discovery_info.device = await self.hass.async_add_executor_job( + discovery_info.device = await self.hass.async_add_executor_job( usb.get_serial_by_id, discovery_info.device ) try: await self.async_set_unique_id(unique_id) finally: - self._abort_if_unique_id_configured(updates={DEVICE: device}) + self._abort_if_unique_id_configured(updates={DEVICE: discovery_info.device}) self._usb_info = discovery_info @@ -148,6 +149,24 @@ class HomeAssistantConnectZBT2ConfigFlow( return await self.async_step_confirm() + async def async_step_import( + self, fw_discovery_info: HardwareFirmwareDiscoveryInfo + ) -> ConfigFlowResult: + """Handle import from ZHA/OTBR firmware notification.""" + assert fw_discovery_info["usb_device"] is not None + usb_info = usb_service_info_from_device(fw_discovery_info["usb_device"]) + unique_id = usb_unique_id_from_service_info(usb_info) + + if await self.async_set_unique_id(unique_id, raise_on_progress=False): + self._abort_if_unique_id_configured(updates={DEVICE: usb_info.device}) + + self._usb_info = usb_info + self._device = usb_info.device + self._hardware_name = HARDWARE_NAME + self._probed_firmware_info = fw_discovery_info["firmware_info"] + + return self._async_flow_finished() + def _async_flow_finished(self) -> ConfigFlowResult: """Create the config entry.""" assert self._usb_info is not None diff --git a/homeassistant/components/homeassistant_hardware/const.py b/homeassistant/components/homeassistant_hardware/const.py index a3c091ff7ee..5b5b509ae1f 100644 --- a/homeassistant/components/homeassistant_hardware/const.py +++ b/homeassistant/components/homeassistant_hardware/const.py @@ -19,6 +19,12 @@ DATA_COMPONENT: HassKey[HardwareInfoDispatcher] = HassKey(DOMAIN) ZHA_DOMAIN = "zha" OTBR_DOMAIN = "otbr" +HARDWARE_INTEGRATION_DOMAINS = { + "homeassistant_sky_connect", + "homeassistant_connect_zbt2", + "homeassistant_yellow", +} + OTBR_ADDON_NAME = "OpenThread Border Router" OTBR_ADDON_MANAGER_DATA = "openthread_border_router" OTBR_ADDON_SLUG = "core_openthread_border_router" diff --git a/homeassistant/components/homeassistant_hardware/helpers.py b/homeassistant/components/homeassistant_hardware/helpers.py index 57558a1b0e7..51932ee3ee6 100644 --- a/homeassistant/components/homeassistant_hardware/helpers.py +++ b/homeassistant/components/homeassistant_hardware/helpers.py @@ -6,19 +6,33 @@ from collections import defaultdict from collections.abc import AsyncIterator, Awaitable, Callable from contextlib import asynccontextmanager import logging -from typing import TYPE_CHECKING, Protocol +from typing import TYPE_CHECKING, Protocol, TypedDict -from homeassistant.config_entries import ConfigEntry +from homeassistant.components.usb import ( + USBDevice, + async_get_usb_matchers_for_device, + usb_device_from_path, +) +from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback from . import DATA_COMPONENT +from .const import HARDWARE_INTEGRATION_DOMAINS if TYPE_CHECKING: from .util import FirmwareInfo + _LOGGER = logging.getLogger(__name__) +class HardwareFirmwareDiscoveryInfo(TypedDict): + """Data for triggering hardware integration discovery via firmware notification.""" + + usb_device: USBDevice | None + firmware_info: FirmwareInfo + + class SyncHardwareFirmwareInfoModule(Protocol): """Protocol type for Home Assistant Hardware firmware info platform modules.""" @@ -46,6 +60,23 @@ type HardwareFirmwareInfoModule = ( ) +@hass_callback +def async_get_hardware_domain_for_usb_device( + hass: HomeAssistant, usb_device: USBDevice +) -> str | None: + """Identify which hardware domain should handle a USB device.""" + matched = async_get_usb_matchers_for_device(hass, usb_device) + hw_domains = {match["domain"] for match in matched} & HARDWARE_INTEGRATION_DOMAINS + + if not hw_domains: + return None + + # We can never have two hardware integrations overlap in discovery + assert len(hw_domains) == 1 + + return list(hw_domains)[0] + + class HardwareInfoDispatcher: """Central dispatcher for hardware/firmware information.""" @@ -94,7 +125,7 @@ class HardwareInfoDispatcher: "Received firmware info notification from %r: %s", domain, firmware_info ) - for callback in self._notification_callbacks.get(firmware_info.device, []): + for callback in list(self._notification_callbacks[firmware_info.device]): try: callback(firmware_info) except Exception: @@ -102,6 +133,48 @@ class HardwareInfoDispatcher: "Error while notifying firmware info listener %s", callback ) + await self._async_trigger_hardware_discovery(firmware_info) + + async def _async_trigger_hardware_discovery( + self, firmware_info: FirmwareInfo + ) -> None: + """Trigger hardware integration config flows from firmware info. + + Identifies which hardware integration should handle the device based on + USB matchers, then triggers an import flow for only that integration. + """ + + usb_device = await self.hass.async_add_executor_job( + usb_device_from_path, firmware_info.device + ) + + if usb_device is None: + _LOGGER.debug("Cannot find USB for path %s", firmware_info.device) + return + + hardware_domain = async_get_hardware_domain_for_usb_device( + self.hass, usb_device + ) + + if hardware_domain is None: + _LOGGER.debug("No hardware integration found for device %s", usb_device) + return + + _LOGGER.debug( + "Triggering %s import flow for device %s", + hardware_domain, + firmware_info.device, + ) + + await self.hass.config_entries.flow.async_init( + hardware_domain, + context={"source": SOURCE_IMPORT}, + data=HardwareFirmwareDiscoveryInfo( + usb_device=usb_device, + firmware_info=firmware_info, + ), + ) + async def iter_firmware_info(self) -> AsyncIterator[FirmwareInfo]: """Iterate over all firmware information for all hardware.""" for domain, fw_info_module in self._providers.items(): diff --git a/homeassistant/components/homeassistant_hardware/manifest.json b/homeassistant/components/homeassistant_hardware/manifest.json index 192aecc93bf..01c27300d71 100644 --- a/homeassistant/components/homeassistant_hardware/manifest.json +++ b/homeassistant/components/homeassistant_hardware/manifest.json @@ -3,6 +3,7 @@ "name": "Home Assistant Hardware", "after_dependencies": ["hassio"], "codeowners": ["@home-assistant/core"], + "dependencies": ["usb"], "documentation": "https://www.home-assistant.io/integrations/homeassistant_hardware", "integration_type": "system", "requirements": [ diff --git a/homeassistant/components/homeassistant_sky_connect/config_flow.py b/homeassistant/components/homeassistant_sky_connect/config_flow.py index 7a9eff0b741..1d8095d80a9 100644 --- a/homeassistant/components/homeassistant_sky_connect/config_flow.py +++ b/homeassistant/components/homeassistant_sky_connect/config_flow.py @@ -10,10 +10,17 @@ from homeassistant.components.homeassistant_hardware import ( firmware_config_flow, silabs_multiprotocol_addon, ) +from homeassistant.components.homeassistant_hardware.helpers import ( + HardwareFirmwareDiscoveryInfo, +) from homeassistant.components.homeassistant_hardware.util import ( ApplicationType, FirmwareInfo, ) +from homeassistant.components.usb import ( + usb_service_info_from_device, + usb_unique_id_from_service_info, +) from homeassistant.config_entries import ( ConfigEntry, ConfigEntryBaseFlow, @@ -142,16 +149,10 @@ class HomeAssistantSkyConnectConfigFlow( async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult: """Handle usb discovery.""" - device = discovery_info.device - vid = discovery_info.vid - pid = discovery_info.pid - serial_number = discovery_info.serial_number - manufacturer = discovery_info.manufacturer - description = discovery_info.description - unique_id = f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}" + unique_id = usb_unique_id_from_service_info(discovery_info) if await self.async_set_unique_id(unique_id): - self._abort_if_unique_id_configured(updates={DEVICE: device}) + self._abort_if_unique_id_configured(updates={DEVICE: discovery_info.device}) discovery_info.device = await self.hass.async_add_executor_job( usb.get_serial_by_id, discovery_info.device @@ -159,8 +160,10 @@ class HomeAssistantSkyConnectConfigFlow( self._usb_info = discovery_info - assert description is not None - self._hw_variant = HardwareVariant.from_usb_product_name(description) + assert discovery_info.description is not None + self._hw_variant = HardwareVariant.from_usb_product_name( + discovery_info.description + ) # Set parent class attributes self._device = self._usb_info.device @@ -168,6 +171,26 @@ class HomeAssistantSkyConnectConfigFlow( return await self.async_step_confirm() + async def async_step_import( + self, fw_discovery_info: HardwareFirmwareDiscoveryInfo + ) -> ConfigFlowResult: + """Handle import from ZHA/OTBR firmware notification.""" + assert fw_discovery_info["usb_device"] is not None + usb_info = usb_service_info_from_device(fw_discovery_info["usb_device"]) + unique_id = usb_unique_id_from_service_info(usb_info) + + if await self.async_set_unique_id(unique_id, raise_on_progress=False): + self._abort_if_unique_id_configured(updates={DEVICE: usb_info.device}) + + self._usb_info = usb_info + assert usb_info.description is not None + self._hw_variant = HardwareVariant.from_usb_product_name(usb_info.description) + self._device = usb_info.device + self._hardware_name = self._hw_variant.full_name + self._probed_firmware_info = fw_discovery_info["firmware_info"] + + return self._async_flow_finished() + def _async_flow_finished(self) -> ConfigFlowResult: """Create the config entry.""" assert self._usb_info is not None diff --git a/homeassistant/components/homeassistant_yellow/__init__.py b/homeassistant/components/homeassistant_yellow/__init__.py index 27c40e35946..07fe496b049 100644 --- a/homeassistant/components/homeassistant_yellow/__init__.py +++ b/homeassistant/components/homeassistant_yellow/__init__.py @@ -41,6 +41,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: firmware = ApplicationType(entry.data[FIRMWARE]) + # Auto start the multiprotocol addon if it is in use if firmware is ApplicationType.CPC: try: await check_multi_pan_addon(hass) diff --git a/homeassistant/components/usb/__init__.py b/homeassistant/components/usb/__init__.py index 90433b0f728..104ba82c6f0 100644 --- a/homeassistant/components/usb/__init__.py +++ b/homeassistant/components/usb/__init__.py @@ -6,7 +6,6 @@ import asyncio from collections.abc import Callable, Coroutine, Sequence import dataclasses from datetime import datetime, timedelta -import fnmatch from functools import partial import logging import os @@ -43,7 +42,11 @@ from .const import DOMAIN from .models import USBDevice from .utils import ( scan_serial_ports, + usb_device_from_path, # noqa: F401 usb_device_from_port, # noqa: F401 + usb_device_matches_matcher, + usb_service_info_from_device, # noqa: F401 + usb_unique_id_from_service_info, # noqa: F401 ) _LOGGER = logging.getLogger(__name__) @@ -121,7 +124,7 @@ def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> boo usb_discovery: USBDiscovery = hass.data[DOMAIN] return any( - _is_matching( + usb_device_matches_matcher( USBDevice( device=device, vid=vid, @@ -143,6 +146,15 @@ def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> boo ) +@hass_callback +def async_get_usb_matchers_for_device( + hass: HomeAssistant, device: USBDevice +) -> list[USBMatcher]: + """Return a list of matchers that match the given device.""" + usb_discovery: USBDiscovery = hass.data[DOMAIN] + return usb_discovery.async_get_usb_matchers_for_device(device) + + _DEPRECATED_UsbServiceInfo = DeprecatedConstant( _UsbServiceInfo, "homeassistant.helpers.service_info.usb.UsbServiceInfo", @@ -214,34 +226,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return True -def _fnmatch_lower(name: str | None, pattern: str) -> bool: - """Match a lowercase version of the name.""" - if name is None: - return False - return fnmatch.fnmatch(name.lower(), pattern) - - -def _is_matching(device: USBDevice, matcher: USBMatcher | USBCallbackMatcher) -> bool: - """Return True if a device matches.""" - if "vid" in matcher and device.vid != matcher["vid"]: - return False - if "pid" in matcher and device.pid != matcher["pid"]: - return False - if "serial_number" in matcher and not _fnmatch_lower( - device.serial_number, matcher["serial_number"] - ): - return False - if "manufacturer" in matcher and not _fnmatch_lower( - device.manufacturer, matcher["manufacturer"] - ): - return False - if "description" in matcher and not _fnmatch_lower( - device.description, matcher["description"] - ): - return False - return True - - async def async_request_scan(hass: HomeAssistant) -> None: """Request a USB scan.""" usb_discovery: USBDiscovery = hass.data[DOMAIN] @@ -383,6 +367,29 @@ class USBDiscovery: return _async_remove_callback + @hass_callback + def async_get_usb_matchers_for_device(self, device: USBDevice) -> list[USBMatcher]: + """Return a list of matchers that match the given device.""" + matched = [ + matcher + for matcher in self.usb + if usb_device_matches_matcher(device, matcher) + ] + + if not matched: + return [] + + # Sort by specificity (most fields matched first) + sorted_by_most_targeted = sorted(matched, key=lambda item: -len(item)) + + # Only return matchers with the same specificity as the most specific one + most_matched_fields = len(sorted_by_most_targeted[0]) + return [ + matcher + for matcher in sorted_by_most_targeted + if len(matcher) == most_matched_fields + ] + async def _async_process_discovered_usb_device(self, device: USBDevice) -> None: """Process a USB discovery.""" _LOGGER.debug("Discovered USB Device: %s", device) @@ -391,21 +398,13 @@ class USBDiscovery: return self.seen.add(device_tuple) - matched = [matcher for matcher in self.usb if _is_matching(device, matcher)] + matched = self.async_get_usb_matchers_for_device(device) if not matched: return service_info: _UsbServiceInfo | None = None - sorted_by_most_targeted = sorted(matched, key=lambda item: -len(item)) - most_matched_fields = len(sorted_by_most_targeted[0]) - - for matcher in sorted_by_most_targeted: - # If there is a less targeted match, we only - # want the most targeted match - if len(matcher) < most_matched_fields: - break - + for matcher in matched: if service_info is None: service_info = _UsbServiceInfo( device=await self.hass.async_add_executor_job( diff --git a/homeassistant/components/usb/utils.py b/homeassistant/components/usb/utils.py index 1bb620ec5f7..b5c78fa5f98 100644 --- a/homeassistant/components/usb/utils.py +++ b/homeassistant/components/usb/utils.py @@ -3,10 +3,15 @@ from __future__ import annotations from collections.abc import Sequence +import fnmatch +import os from serial.tools.list_ports import comports from serial.tools.list_ports_common import ListPortInfo +from homeassistant.helpers.service_info.usb import UsbServiceInfo +from homeassistant.loader import USBMatcher + from .models import USBDevice @@ -29,3 +34,89 @@ def scan_serial_ports() -> Sequence[USBDevice]: for port in comports() if port.vid is not None or port.pid is not None ] + + +def usb_device_from_path(device_path: str) -> USBDevice | None: + """Get USB device info from a device path.""" + + # Scan all symlinks first + by_id = "/dev/serial/by-id" + realpath_to_by_id: dict[str, str] = {} + if os.path.isdir(by_id): + for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()): + realpath_to_by_id[os.path.realpath(path)] = path + + # Then compare the actual path to each serial port's + device_path_real = os.path.realpath(device_path) + + for device in scan_serial_ports(): + normalized_path = realpath_to_by_id.get(device.device, device.device) + if ( + normalized_path == device_path + or os.path.realpath(device.device) == device_path_real + ): + return USBDevice( + device=normalized_path, + vid=device.vid, + pid=device.pid, + serial_number=device.serial_number, + manufacturer=device.manufacturer, + description=device.description, + ) + + return None + + +def _fnmatch_lower(name: str | None, pattern: str) -> bool: + """Match a lowercase version of the name.""" + if name is None: + return False + return fnmatch.fnmatch(name.lower(), pattern) + + +def usb_device_matches_matcher(device: USBDevice, matcher: USBMatcher) -> bool: + """Check if a USB device matches a USB matcher.""" + if "vid" in matcher and device.vid != matcher["vid"]: + return False + + if "pid" in matcher and device.pid != matcher["pid"]: + return False + + if "serial_number" in matcher and not _fnmatch_lower( + device.serial_number, matcher["serial_number"] + ): + return False + + if "manufacturer" in matcher and not _fnmatch_lower( + device.manufacturer, matcher["manufacturer"] + ): + return False + + if "description" in matcher and not _fnmatch_lower( + device.description, matcher["description"] + ): + return False + + return True + + +def usb_unique_id_from_service_info(usb_info: UsbServiceInfo) -> str: + """Generate a unique ID from USB service info.""" + return ( + f"{usb_info.vid}:{usb_info.pid}_" + f"{usb_info.serial_number}_" + f"{usb_info.manufacturer}_" + f"{usb_info.description}" + ) + + +def usb_service_info_from_device(usb_device: USBDevice) -> UsbServiceInfo: + """Convert a USBDevice to UsbServiceInfo.""" + return UsbServiceInfo( + device=usb_device.device, + vid=usb_device.vid, + pid=usb_device.pid, + serial_number=usb_device.serial_number, + manufacturer=usb_device.manufacturer, + description=usb_device.description, + ) diff --git a/tests/components/homeassistant_connect_zbt2/test_config_flow.py b/tests/components/homeassistant_connect_zbt2/test_config_flow.py index dc32741165e..8178cac5f60 100644 --- a/tests/components/homeassistant_connect_zbt2/test_config_flow.py +++ b/tests/components/homeassistant_connect_zbt2/test_config_flow.py @@ -10,14 +10,19 @@ from homeassistant.components.homeassistant_hardware.firmware_config_flow import STEP_PICK_FIRMWARE_THREAD, STEP_PICK_FIRMWARE_ZIGBEE, ) +from homeassistant.components.homeassistant_hardware.helpers import ( + async_notify_firmware_info, +) from homeassistant.components.homeassistant_hardware.util import ( ApplicationType, FirmwareInfo, ) +from homeassistant.components.usb import USBDevice from homeassistant.config_entries import ConfigFlowResult from homeassistant.core import HomeAssistant from homeassistant.data_entry_flow import FlowResultType from homeassistant.helpers.service_info.usb import UsbServiceInfo +from homeassistant.setup import async_setup_component from .common import USB_DATA_ZBT2 @@ -382,3 +387,122 @@ async def test_duplicate_discovery_updates_usb_path(hass: HomeAssistant) -> None assert result["reason"] == "already_configured" assert config_entry.data["device"] == USB_DATA_ZBT2.device + + +async def test_firmware_callback_auto_creates_entry(hass: HomeAssistant) -> None: + """Test that firmware notification triggers import flow that auto-creates config entry.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + await async_setup_component(hass, "usb", {}) + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": "usb"}, data=USB_DATA_ZBT2 + ) + + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "pick_firmware" + + usb_device = USBDevice( + device=USB_DATA_ZBT2.device, + vid=USB_DATA_ZBT2.vid, + pid=USB_DATA_ZBT2.pid, + serial_number=USB_DATA_ZBT2.serial_number, + manufacturer=USB_DATA_ZBT2.manufacturer, + description=USB_DATA_ZBT2.description, + ) + + with patch( + "homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path", + return_value=usb_device, + ): + await async_notify_firmware_info( + hass, + "zha", + FirmwareInfo( + device=USB_DATA_ZBT2.device, + firmware_type=ApplicationType.EZSP, + firmware_version="7.4.4.0", + owners=[], + source="zha", + ), + ) + + await hass.async_block_till_done() + + # The config entry was auto-created + entries = hass.config_entries.async_entries(DOMAIN) + assert len(entries) == 1 + assert entries[0].data == { + "device": USB_DATA_ZBT2.device, + "firmware": ApplicationType.EZSP.value, + "firmware_version": "7.4.4.0", + "vid": USB_DATA_ZBT2.vid, + "pid": USB_DATA_ZBT2.pid, + "serial_number": USB_DATA_ZBT2.serial_number, + "manufacturer": USB_DATA_ZBT2.manufacturer, + "product": USB_DATA_ZBT2.description, + } + + # The discovery flow is gone + assert not hass.config_entries.flow.async_progress_by_handler(DOMAIN) + + +async def test_firmware_callback_updates_existing_entry(hass: HomeAssistant) -> None: + """Test that firmware notification updates existing config entry device path.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + await async_setup_component(hass, "usb", {}) + + # Create existing config entry with old device path + config_entry = MockConfigEntry( + domain=DOMAIN, + data={ + "firmware": ApplicationType.EZSP.value, + "firmware_version": "7.4.4.0", + "device": "/dev/oldpath", + "vid": USB_DATA_ZBT2.vid, + "pid": USB_DATA_ZBT2.pid, + "serial_number": USB_DATA_ZBT2.serial_number, + "manufacturer": USB_DATA_ZBT2.manufacturer, + "product": USB_DATA_ZBT2.description, + }, + unique_id=( + f"{USB_DATA_ZBT2.vid}:{USB_DATA_ZBT2.pid}_" + f"{USB_DATA_ZBT2.serial_number}_" + f"{USB_DATA_ZBT2.manufacturer}_" + f"{USB_DATA_ZBT2.description}" + ), + ) + config_entry.add_to_hass(hass) + + usb_device = USBDevice( + device=USB_DATA_ZBT2.device, + vid=USB_DATA_ZBT2.vid, + pid=USB_DATA_ZBT2.pid, + serial_number=USB_DATA_ZBT2.serial_number, + manufacturer=USB_DATA_ZBT2.manufacturer, + description=USB_DATA_ZBT2.description, + ) + + with patch( + "homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path", + return_value=usb_device, + ): + await async_notify_firmware_info( + hass, + "zha", + FirmwareInfo( + device=USB_DATA_ZBT2.device, + firmware_type=ApplicationType.EZSP, + firmware_version="7.4.4.0", + owners=[], + source="zha", + ), + ) + + await hass.async_block_till_done() + + # The config entry device path should be updated + assert config_entry.data["device"] == USB_DATA_ZBT2.device + + # No new config entry was created + entries = hass.config_entries.async_entries(DOMAIN) + assert len(entries) == 1 diff --git a/tests/components/homeassistant_hardware/test_helpers.py b/tests/components/homeassistant_hardware/test_helpers.py index 3c741df2f23..540d2ca7afd 100644 --- a/tests/components/homeassistant_hardware/test_helpers.py +++ b/tests/components/homeassistant_hardware/test_helpers.py @@ -1,7 +1,8 @@ """Test hardware helpers.""" +from collections.abc import Callable import logging -from unittest.mock import AsyncMock, MagicMock, Mock, call +from unittest.mock import AsyncMock, MagicMock, Mock, call, patch import pytest @@ -19,6 +20,7 @@ from homeassistant.components.homeassistant_hardware.util import ( ApplicationType, FirmwareInfo, ) +from homeassistant.components.usb import USBDevice from homeassistant.config_entries import ConfigEntryState from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component @@ -257,3 +259,113 @@ async def test_firmware_update_context_manager(hass: HomeAssistant) -> None: # Should be cleaned up after first context assert not async_is_firmware_update_in_progress(hass, device_path) + + +async def test_dispatcher_callback_self_unregister(hass: HomeAssistant) -> None: + """Test callbacks can unregister themselves during notification.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + + called_callbacks = [] + unregister_funcs = {} + + def create_self_unregistering_callback(name: str) -> Callable[[FirmwareInfo], None]: + def callback(firmware_info: FirmwareInfo) -> None: + called_callbacks.append(name) + unregister_funcs[name]() + + return callback + + callback1 = create_self_unregistering_callback("callback1") + callback2 = create_self_unregistering_callback("callback2") + callback3 = create_self_unregistering_callback("callback3") + + # Register all three callbacks and store their unregister functions + unregister_funcs["callback1"] = async_register_firmware_info_callback( + hass, "/dev/serial/by-id/device1", callback1 + ) + unregister_funcs["callback2"] = async_register_firmware_info_callback( + hass, "/dev/serial/by-id/device1", callback2 + ) + unregister_funcs["callback3"] = async_register_firmware_info_callback( + hass, "/dev/serial/by-id/device1", callback3 + ) + + # All callbacks should be called and unregister themselves + await async_notify_firmware_info(hass, "zha", firmware_info=FIRMWARE_INFO_EZSP) + assert set(called_callbacks) == {"callback1", "callback2", "callback3"} + + # No callbacks should be called since they all unregistered + called_callbacks.clear() + await async_notify_firmware_info(hass, "zha", firmware_info=FIRMWARE_INFO_EZSP) + assert not called_callbacks + + +async def test_firmware_callback_no_usb_device( + hass: HomeAssistant, caplog: pytest.LogCaptureFixture +) -> None: + """Test firmware notification when usb_device_from_path returns None.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + await async_setup_component(hass, "usb", {}) + + with ( + patch( + "homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path", + return_value=None, + ), + caplog.at_level(logging.DEBUG), + ): + await async_notify_firmware_info( + hass, + "zha", + FirmwareInfo( + device="/dev/ttyUSB99", + firmware_type=ApplicationType.EZSP, + firmware_version="7.4.4.0", + owners=[], + source="zha", + ), + ) + await hass.async_block_till_done() + + # This isn't a codepath that's expected but we won't fail in this case, just log + assert "Cannot find USB for path /dev/ttyUSB99" in caplog.text + + +async def test_firmware_callback_no_hardware_domain( + hass: HomeAssistant, caplog: pytest.LogCaptureFixture +) -> None: + """Test firmware notification when no hardware domain is found for device.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + await async_setup_component(hass, "usb", {}) + + # Create a USB device that doesn't match any hardware integration + usb_device = USBDevice( + device="/dev/ttyUSB0", + vid="9999", + pid="9999", + serial_number="TEST123", + manufacturer="Test Manufacturer", + description="Test Device", + ) + + with ( + patch( + "homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path", + return_value=usb_device, + ), + caplog.at_level(logging.DEBUG), + ): + await async_notify_firmware_info( + hass, + "zha", + FirmwareInfo( + device="/dev/ttyUSB0", + firmware_type=ApplicationType.EZSP, + firmware_version="7.4.4.0", + owners=[], + source="zha", + ), + ) + await hass.async_block_till_done() + + assert "No hardware integration found for device" in caplog.text diff --git a/tests/components/homeassistant_sky_connect/test_config_flow.py b/tests/components/homeassistant_sky_connect/test_config_flow.py index 01478900c60..b0d58473a67 100644 --- a/tests/components/homeassistant_sky_connect/test_config_flow.py +++ b/tests/components/homeassistant_sky_connect/test_config_flow.py @@ -10,6 +10,9 @@ from homeassistant.components.homeassistant_hardware.firmware_config_flow import STEP_PICK_FIRMWARE_THREAD, STEP_PICK_FIRMWARE_ZIGBEE, ) +from homeassistant.components.homeassistant_hardware.helpers import ( + async_notify_firmware_info, +) from homeassistant.components.homeassistant_hardware.silabs_multiprotocol_addon import ( CONF_DISABLE_MULTI_PAN, get_flasher_addon_manager, @@ -20,10 +23,12 @@ from homeassistant.components.homeassistant_hardware.util import ( FirmwareInfo, ) from homeassistant.components.homeassistant_sky_connect.const import DOMAIN +from homeassistant.components.usb import USBDevice from homeassistant.config_entries import ConfigFlowResult from homeassistant.core import HomeAssistant from homeassistant.data_entry_flow import FlowResultType from homeassistant.helpers.service_info.usb import UsbServiceInfo +from homeassistant.setup import async_setup_component from .common import USB_DATA_SKY, USB_DATA_ZBT1 @@ -426,3 +431,187 @@ async def test_options_flow_multipan_uninstall( # We've reverted the firmware back to Zigbee assert config_entry.data["firmware"] == "ezsp" + + +@pytest.mark.parametrize( + ("usb_data", "model"), + [ + (USB_DATA_SKY, "Home Assistant SkyConnect"), + (USB_DATA_ZBT1, "Home Assistant Connect ZBT-1"), + ], +) +async def test_firmware_callback_auto_creates_entry( + usb_data: UsbServiceInfo, + model: str, + hass: HomeAssistant, +) -> None: + """Test that firmware notification triggers import flow that auto-creates config entry.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + await async_setup_component(hass, "usb", {}) + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": "usb"}, data=usb_data + ) + + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "pick_firmware" + + usb_device = USBDevice( + device=usb_data.device, + vid=usb_data.vid, + pid=usb_data.pid, + serial_number=usb_data.serial_number, + manufacturer=usb_data.manufacturer, + description=usb_data.description, + ) + + with patch( + "homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path", + return_value=usb_device, + ): + await async_notify_firmware_info( + hass, + "zha", + FirmwareInfo( + device=usb_data.device, + firmware_type=ApplicationType.EZSP, + firmware_version="7.4.4.0", + owners=[], + source="zha", + ), + ) + + await hass.async_block_till_done() + + # The config entry was auto-created + entries = hass.config_entries.async_entries(DOMAIN) + assert len(entries) == 1 + assert entries[0].data == { + "device": usb_data.device, + "firmware": ApplicationType.EZSP.value, + "firmware_version": "7.4.4.0", + "vid": usb_data.vid, + "pid": usb_data.pid, + "serial_number": usb_data.serial_number, + "manufacturer": usb_data.manufacturer, + "description": usb_data.description, + "product": usb_data.description, + } + + # The discovery flow is gone + assert not hass.config_entries.flow.async_progress_by_handler(DOMAIN) + + +@pytest.mark.parametrize( + ("usb_data", "model"), + [ + (USB_DATA_SKY, "Home Assistant SkyConnect"), + (USB_DATA_ZBT1, "Home Assistant Connect ZBT-1"), + ], +) +async def test_duplicate_usb_discovery_aborts_early( + usb_data: UsbServiceInfo, model: str, hass: HomeAssistant +) -> None: + """Test USB discovery aborts early when unique_id exists before serial path resolution.""" + # Create existing config entry + config_entry = MockConfigEntry( + domain=DOMAIN, + data={ + "firmware": "ezsp", + "device": "/dev/oldpath", + "manufacturer": usb_data.manufacturer, + "pid": usb_data.pid, + "description": usb_data.description, + "product": usb_data.description, + "serial_number": usb_data.serial_number, + "vid": usb_data.vid, + }, + unique_id=( + f"{usb_data.vid}:{usb_data.pid}_" + f"{usb_data.serial_number}_" + f"{usb_data.manufacturer}_" + f"{usb_data.description}" + ), + ) + config_entry.add_to_hass(hass) + + # Try to discover the same device with a different path + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": "usb"}, data=usb_data + ) + + # Should abort before get_serial_by_id is called + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "already_configured" + + +@pytest.mark.parametrize( + ("usb_data", "model"), + [ + (USB_DATA_SKY, "Home Assistant SkyConnect"), + (USB_DATA_ZBT1, "Home Assistant Connect ZBT-1"), + ], +) +async def test_firmware_callback_updates_existing_entry( + usb_data: UsbServiceInfo, model: str, hass: HomeAssistant +) -> None: + """Test that firmware notification updates existing config entry device path.""" + await async_setup_component(hass, "homeassistant_hardware", {}) + await async_setup_component(hass, "usb", {}) + + # Create existing config entry with old device path + config_entry = MockConfigEntry( + domain=DOMAIN, + data={ + "firmware": ApplicationType.EZSP.value, + "firmware_version": "7.4.4.0", + "device": "/dev/oldpath", + "vid": usb_data.vid, + "pid": usb_data.pid, + "serial_number": usb_data.serial_number, + "manufacturer": usb_data.manufacturer, + "description": usb_data.description, + "product": usb_data.description, + }, + unique_id=( + f"{usb_data.vid}:{usb_data.pid}_" + f"{usb_data.serial_number}_" + f"{usb_data.manufacturer}_" + f"{usb_data.description}" + ), + ) + config_entry.add_to_hass(hass) + + usb_device = USBDevice( + device=usb_data.device, + vid=usb_data.vid, + pid=usb_data.pid, + serial_number=usb_data.serial_number, + manufacturer=usb_data.manufacturer, + description=usb_data.description, + ) + + with patch( + "homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path", + return_value=usb_device, + ): + await async_notify_firmware_info( + hass, + "zha", + FirmwareInfo( + device=usb_data.device, + firmware_type=ApplicationType.EZSP, + firmware_version="7.4.4.0", + owners=[], + source="zha", + ), + ) + + await hass.async_block_till_done() + + # The config entry device path should be updated + assert config_entry.data["device"] == usb_data.device + + # No new config entry was created + entries = hass.config_entries.async_entries(DOMAIN) + assert len(entries) == 1 diff --git a/tests/components/usb/test_init.py b/tests/components/usb/test_init.py index 3a56e929b22..49ebbf42af5 100644 --- a/tests/components/usb/test_init.py +++ b/tests/components/usb/test_init.py @@ -1,6 +1,7 @@ """Tests for the USB Discovery integration.""" import asyncio +import dataclasses from datetime import timedelta import logging import os @@ -11,6 +12,7 @@ import pytest from homeassistant.components import usb from homeassistant.components.usb.models import USBDevice +from homeassistant.components.usb.utils import usb_device_from_path from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP from homeassistant.core import HomeAssistant from homeassistant.helpers.service_info.usb import UsbServiceInfo @@ -1383,3 +1385,126 @@ async def test_register_port_event_callback_failure( assert caplog.text.count("Error in USB port event callback") == 2 assert "Failure 1" in caplog.text assert "Failure 2" in caplog.text + + +def test_usb_device_from_path_with_symlinks() -> None: + """Test usb_device_from_path resolves devices using symlink mapping.""" + # Mock /dev/serial/by-id exists and contains symlinks + entry1 = MagicMock(spec_set=os.DirEntry) + entry1.is_symlink.return_value = True + entry1.path = "/dev/serial/by-id/usb-device1" + + entry2 = MagicMock(spec_set=os.DirEntry) + entry2.is_symlink.return_value = True + entry2.path = "/dev/serial/by-id/usb-device2" + + def mock_realpath(path: str) -> str: + realpath_map = { + "/dev/serial/by-id/usb-device1": "/dev/ttyUSB0", + "/dev/serial/by-id/usb-device2": "/dev/ttyUSB1", + "/dev/ttyUSB0": "/dev/ttyUSB0", + "/dev/ttyUSB1": "/dev/ttyUSB1", + } + return realpath_map.get(path, path) + + usb_device = USBDevice( + device="/dev/ttyUSB0", + vid="1234", + pid="5678", + serial_number="ABC123", + manufacturer="Test Manufacturer", + description="Test Device", + ) + + with ( + patch("os.path.isdir", return_value=True), + patch("os.scandir", return_value=[entry1, entry2]), + patch("os.path.realpath", side_effect=mock_realpath), + patch( + "homeassistant.components.usb.utils.scan_serial_ports", + return_value=[usb_device], + ), + ): + dev_from_path = usb_device_from_path("/dev/serial/by-id/usb-device1") + + # The USB device for the given path differs from the `scan_serial_ports` only by its + # `device` pointing to a symlink + assert dev_from_path == dataclasses.replace( + usb_device, device="/dev/serial/by-id/usb-device1" + ) + + +def test_usb_device_from_path_with_realpath_match() -> None: + """Test usb_device_from_path falls back to the original path without a symlink.""" + usb_device = USBDevice( + device="/dev/ttyUSB0", + vid="1234", + pid="5678", + serial_number="ABC123", + manufacturer="Test Manufacturer", + description="Test Device", + ) + + with ( + patch("os.path.isdir", return_value=True), + patch("os.scandir", return_value=[]), + patch("os.path.realpath", side_effect=lambda x: x), + patch( + "homeassistant.components.usb.utils.scan_serial_ports", + return_value=[usb_device], + ), + ): + dev_from_path = usb_device_from_path("/dev/ttyUSB0") + + # There is no symlink for the device so we must keep using the base `/dev/` path + assert dev_from_path == usb_device + + +def test_usb_device_from_path_no_match() -> None: + """Test usb_device_from_path returns None when device not found.""" + usb_device = USBDevice( + device="/dev/ttyUSB0", + vid="1234", + pid="5678", + serial_number="ABC123", + manufacturer="Test Manufacturer", + description="Test Device", + ) + + with ( + patch("os.path.isdir", return_value=True), + patch("os.scandir", return_value=[]), + patch("os.path.realpath", side_effect=lambda x: x), + patch( + "homeassistant.components.usb.utils.scan_serial_ports", + return_value=[usb_device], + ), + ): + dev_from_path = usb_device_from_path("/dev/ttyUSB99") + + assert dev_from_path is None + + +def test_usb_device_from_path_no_by_id_dir() -> None: + """Test usb_device_from_path when /dev/serial/by-id doesn't exist.""" + usb_device = USBDevice( + device="/dev/ttyUSB0", + vid="1234", + pid="5678", + serial_number="ABC123", + manufacturer="Test Manufacturer", + description="Test Device", + ) + + with ( + patch("os.path.isdir", return_value=False), + patch("os.path.realpath", side_effect=lambda x: x), + patch( + "homeassistant.components.usb.utils.scan_serial_ports", + return_value=[usb_device], + ), + ): + dev_from_path = usb_device_from_path("/dev/ttyUSB0") + + # We have no symlinks so we use the base path + assert dev_from_path == usb_device