Automatically setup hardware integrations when firmware info is published by an integration (#154030)

This commit is contained in:
puddly
2025-10-13 10:26:01 -04:00
committed by GitHub
parent 3c3b4ef14a
commit f1e0954c61
12 changed files with 826 additions and 63 deletions

View File

@@ -7,11 +7,18 @@ from typing import TYPE_CHECKING, Any, Protocol
from homeassistant.components import usb
from homeassistant.components.homeassistant_hardware import firmware_config_flow
from homeassistant.components.homeassistant_hardware.helpers import (
HardwareFirmwareDiscoveryInfo,
)
from homeassistant.components.homeassistant_hardware.util import (
ApplicationType,
FirmwareInfo,
ResetTarget,
)
from homeassistant.components.usb import (
usb_service_info_from_device,
usb_unique_id_from_service_info,
)
from homeassistant.config_entries import (
ConfigEntry,
ConfigEntryBaseFlow,
@@ -123,22 +130,16 @@ class HomeAssistantConnectZBT2ConfigFlow(
async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult:
"""Handle usb discovery."""
device = discovery_info.device
vid = discovery_info.vid
pid = discovery_info.pid
serial_number = discovery_info.serial_number
manufacturer = discovery_info.manufacturer
description = discovery_info.description
unique_id = f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}"
unique_id = usb_unique_id_from_service_info(discovery_info)
device = discovery_info.device = await self.hass.async_add_executor_job(
discovery_info.device = await self.hass.async_add_executor_job(
usb.get_serial_by_id, discovery_info.device
)
try:
await self.async_set_unique_id(unique_id)
finally:
self._abort_if_unique_id_configured(updates={DEVICE: device})
self._abort_if_unique_id_configured(updates={DEVICE: discovery_info.device})
self._usb_info = discovery_info
@@ -148,6 +149,24 @@ class HomeAssistantConnectZBT2ConfigFlow(
return await self.async_step_confirm()
async def async_step_import(
self, fw_discovery_info: HardwareFirmwareDiscoveryInfo
) -> ConfigFlowResult:
"""Handle import from ZHA/OTBR firmware notification."""
assert fw_discovery_info["usb_device"] is not None
usb_info = usb_service_info_from_device(fw_discovery_info["usb_device"])
unique_id = usb_unique_id_from_service_info(usb_info)
if await self.async_set_unique_id(unique_id, raise_on_progress=False):
self._abort_if_unique_id_configured(updates={DEVICE: usb_info.device})
self._usb_info = usb_info
self._device = usb_info.device
self._hardware_name = HARDWARE_NAME
self._probed_firmware_info = fw_discovery_info["firmware_info"]
return self._async_flow_finished()
def _async_flow_finished(self) -> ConfigFlowResult:
"""Create the config entry."""
assert self._usb_info is not None

View File

@@ -19,6 +19,12 @@ DATA_COMPONENT: HassKey[HardwareInfoDispatcher] = HassKey(DOMAIN)
ZHA_DOMAIN = "zha"
OTBR_DOMAIN = "otbr"
HARDWARE_INTEGRATION_DOMAINS = {
"homeassistant_sky_connect",
"homeassistant_connect_zbt2",
"homeassistant_yellow",
}
OTBR_ADDON_NAME = "OpenThread Border Router"
OTBR_ADDON_MANAGER_DATA = "openthread_border_router"
OTBR_ADDON_SLUG = "core_openthread_border_router"

View File

@@ -6,19 +6,33 @@ from collections import defaultdict
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager
import logging
from typing import TYPE_CHECKING, Protocol
from typing import TYPE_CHECKING, Protocol, TypedDict
from homeassistant.config_entries import ConfigEntry
from homeassistant.components.usb import (
USBDevice,
async_get_usb_matchers_for_device,
usb_device_from_path,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback
from . import DATA_COMPONENT
from .const import HARDWARE_INTEGRATION_DOMAINS
if TYPE_CHECKING:
from .util import FirmwareInfo
_LOGGER = logging.getLogger(__name__)
class HardwareFirmwareDiscoveryInfo(TypedDict):
"""Data for triggering hardware integration discovery via firmware notification."""
usb_device: USBDevice | None
firmware_info: FirmwareInfo
class SyncHardwareFirmwareInfoModule(Protocol):
"""Protocol type for Home Assistant Hardware firmware info platform modules."""
@@ -46,6 +60,23 @@ type HardwareFirmwareInfoModule = (
)
@hass_callback
def async_get_hardware_domain_for_usb_device(
hass: HomeAssistant, usb_device: USBDevice
) -> str | None:
"""Identify which hardware domain should handle a USB device."""
matched = async_get_usb_matchers_for_device(hass, usb_device)
hw_domains = {match["domain"] for match in matched} & HARDWARE_INTEGRATION_DOMAINS
if not hw_domains:
return None
# We can never have two hardware integrations overlap in discovery
assert len(hw_domains) == 1
return list(hw_domains)[0]
class HardwareInfoDispatcher:
"""Central dispatcher for hardware/firmware information."""
@@ -94,7 +125,7 @@ class HardwareInfoDispatcher:
"Received firmware info notification from %r: %s", domain, firmware_info
)
for callback in self._notification_callbacks.get(firmware_info.device, []):
for callback in list(self._notification_callbacks[firmware_info.device]):
try:
callback(firmware_info)
except Exception:
@@ -102,6 +133,48 @@ class HardwareInfoDispatcher:
"Error while notifying firmware info listener %s", callback
)
await self._async_trigger_hardware_discovery(firmware_info)
async def _async_trigger_hardware_discovery(
self, firmware_info: FirmwareInfo
) -> None:
"""Trigger hardware integration config flows from firmware info.
Identifies which hardware integration should handle the device based on
USB matchers, then triggers an import flow for only that integration.
"""
usb_device = await self.hass.async_add_executor_job(
usb_device_from_path, firmware_info.device
)
if usb_device is None:
_LOGGER.debug("Cannot find USB for path %s", firmware_info.device)
return
hardware_domain = async_get_hardware_domain_for_usb_device(
self.hass, usb_device
)
if hardware_domain is None:
_LOGGER.debug("No hardware integration found for device %s", usb_device)
return
_LOGGER.debug(
"Triggering %s import flow for device %s",
hardware_domain,
firmware_info.device,
)
await self.hass.config_entries.flow.async_init(
hardware_domain,
context={"source": SOURCE_IMPORT},
data=HardwareFirmwareDiscoveryInfo(
usb_device=usb_device,
firmware_info=firmware_info,
),
)
async def iter_firmware_info(self) -> AsyncIterator[FirmwareInfo]:
"""Iterate over all firmware information for all hardware."""
for domain, fw_info_module in self._providers.items():

View File

@@ -3,6 +3,7 @@
"name": "Home Assistant Hardware",
"after_dependencies": ["hassio"],
"codeowners": ["@home-assistant/core"],
"dependencies": ["usb"],
"documentation": "https://www.home-assistant.io/integrations/homeassistant_hardware",
"integration_type": "system",
"requirements": [

View File

@@ -10,10 +10,17 @@ from homeassistant.components.homeassistant_hardware import (
firmware_config_flow,
silabs_multiprotocol_addon,
)
from homeassistant.components.homeassistant_hardware.helpers import (
HardwareFirmwareDiscoveryInfo,
)
from homeassistant.components.homeassistant_hardware.util import (
ApplicationType,
FirmwareInfo,
)
from homeassistant.components.usb import (
usb_service_info_from_device,
usb_unique_id_from_service_info,
)
from homeassistant.config_entries import (
ConfigEntry,
ConfigEntryBaseFlow,
@@ -142,16 +149,10 @@ class HomeAssistantSkyConnectConfigFlow(
async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult:
"""Handle usb discovery."""
device = discovery_info.device
vid = discovery_info.vid
pid = discovery_info.pid
serial_number = discovery_info.serial_number
manufacturer = discovery_info.manufacturer
description = discovery_info.description
unique_id = f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}"
unique_id = usb_unique_id_from_service_info(discovery_info)
if await self.async_set_unique_id(unique_id):
self._abort_if_unique_id_configured(updates={DEVICE: device})
self._abort_if_unique_id_configured(updates={DEVICE: discovery_info.device})
discovery_info.device = await self.hass.async_add_executor_job(
usb.get_serial_by_id, discovery_info.device
@@ -159,8 +160,10 @@ class HomeAssistantSkyConnectConfigFlow(
self._usb_info = discovery_info
assert description is not None
self._hw_variant = HardwareVariant.from_usb_product_name(description)
assert discovery_info.description is not None
self._hw_variant = HardwareVariant.from_usb_product_name(
discovery_info.description
)
# Set parent class attributes
self._device = self._usb_info.device
@@ -168,6 +171,26 @@ class HomeAssistantSkyConnectConfigFlow(
return await self.async_step_confirm()
async def async_step_import(
self, fw_discovery_info: HardwareFirmwareDiscoveryInfo
) -> ConfigFlowResult:
"""Handle import from ZHA/OTBR firmware notification."""
assert fw_discovery_info["usb_device"] is not None
usb_info = usb_service_info_from_device(fw_discovery_info["usb_device"])
unique_id = usb_unique_id_from_service_info(usb_info)
if await self.async_set_unique_id(unique_id, raise_on_progress=False):
self._abort_if_unique_id_configured(updates={DEVICE: usb_info.device})
self._usb_info = usb_info
assert usb_info.description is not None
self._hw_variant = HardwareVariant.from_usb_product_name(usb_info.description)
self._device = usb_info.device
self._hardware_name = self._hw_variant.full_name
self._probed_firmware_info = fw_discovery_info["firmware_info"]
return self._async_flow_finished()
def _async_flow_finished(self) -> ConfigFlowResult:
"""Create the config entry."""
assert self._usb_info is not None

View File

@@ -41,6 +41,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
firmware = ApplicationType(entry.data[FIRMWARE])
# Auto start the multiprotocol addon if it is in use
if firmware is ApplicationType.CPC:
try:
await check_multi_pan_addon(hass)

View File

@@ -6,7 +6,6 @@ import asyncio
from collections.abc import Callable, Coroutine, Sequence
import dataclasses
from datetime import datetime, timedelta
import fnmatch
from functools import partial
import logging
import os
@@ -43,7 +42,11 @@ from .const import DOMAIN
from .models import USBDevice
from .utils import (
scan_serial_ports,
usb_device_from_path, # noqa: F401
usb_device_from_port, # noqa: F401
usb_device_matches_matcher,
usb_service_info_from_device, # noqa: F401
usb_unique_id_from_service_info, # noqa: F401
)
_LOGGER = logging.getLogger(__name__)
@@ -121,7 +124,7 @@ def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> boo
usb_discovery: USBDiscovery = hass.data[DOMAIN]
return any(
_is_matching(
usb_device_matches_matcher(
USBDevice(
device=device,
vid=vid,
@@ -143,6 +146,15 @@ def async_is_plugged_in(hass: HomeAssistant, matcher: USBCallbackMatcher) -> boo
)
@hass_callback
def async_get_usb_matchers_for_device(
hass: HomeAssistant, device: USBDevice
) -> list[USBMatcher]:
"""Return a list of matchers that match the given device."""
usb_discovery: USBDiscovery = hass.data[DOMAIN]
return usb_discovery.async_get_usb_matchers_for_device(device)
_DEPRECATED_UsbServiceInfo = DeprecatedConstant(
_UsbServiceInfo,
"homeassistant.helpers.service_info.usb.UsbServiceInfo",
@@ -214,34 +226,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
def _fnmatch_lower(name: str | None, pattern: str) -> bool:
"""Match a lowercase version of the name."""
if name is None:
return False
return fnmatch.fnmatch(name.lower(), pattern)
def _is_matching(device: USBDevice, matcher: USBMatcher | USBCallbackMatcher) -> bool:
"""Return True if a device matches."""
if "vid" in matcher and device.vid != matcher["vid"]:
return False
if "pid" in matcher and device.pid != matcher["pid"]:
return False
if "serial_number" in matcher and not _fnmatch_lower(
device.serial_number, matcher["serial_number"]
):
return False
if "manufacturer" in matcher and not _fnmatch_lower(
device.manufacturer, matcher["manufacturer"]
):
return False
if "description" in matcher and not _fnmatch_lower(
device.description, matcher["description"]
):
return False
return True
async def async_request_scan(hass: HomeAssistant) -> None:
"""Request a USB scan."""
usb_discovery: USBDiscovery = hass.data[DOMAIN]
@@ -383,6 +367,29 @@ class USBDiscovery:
return _async_remove_callback
@hass_callback
def async_get_usb_matchers_for_device(self, device: USBDevice) -> list[USBMatcher]:
"""Return a list of matchers that match the given device."""
matched = [
matcher
for matcher in self.usb
if usb_device_matches_matcher(device, matcher)
]
if not matched:
return []
# Sort by specificity (most fields matched first)
sorted_by_most_targeted = sorted(matched, key=lambda item: -len(item))
# Only return matchers with the same specificity as the most specific one
most_matched_fields = len(sorted_by_most_targeted[0])
return [
matcher
for matcher in sorted_by_most_targeted
if len(matcher) == most_matched_fields
]
async def _async_process_discovered_usb_device(self, device: USBDevice) -> None:
"""Process a USB discovery."""
_LOGGER.debug("Discovered USB Device: %s", device)
@@ -391,21 +398,13 @@ class USBDiscovery:
return
self.seen.add(device_tuple)
matched = [matcher for matcher in self.usb if _is_matching(device, matcher)]
matched = self.async_get_usb_matchers_for_device(device)
if not matched:
return
service_info: _UsbServiceInfo | None = None
sorted_by_most_targeted = sorted(matched, key=lambda item: -len(item))
most_matched_fields = len(sorted_by_most_targeted[0])
for matcher in sorted_by_most_targeted:
# If there is a less targeted match, we only
# want the most targeted match
if len(matcher) < most_matched_fields:
break
for matcher in matched:
if service_info is None:
service_info = _UsbServiceInfo(
device=await self.hass.async_add_executor_job(

View File

@@ -3,10 +3,15 @@
from __future__ import annotations
from collections.abc import Sequence
import fnmatch
import os
from serial.tools.list_ports import comports
from serial.tools.list_ports_common import ListPortInfo
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.loader import USBMatcher
from .models import USBDevice
@@ -29,3 +34,89 @@ def scan_serial_ports() -> Sequence[USBDevice]:
for port in comports()
if port.vid is not None or port.pid is not None
]
def usb_device_from_path(device_path: str) -> USBDevice | None:
"""Get USB device info from a device path."""
# Scan all symlinks first
by_id = "/dev/serial/by-id"
realpath_to_by_id: dict[str, str] = {}
if os.path.isdir(by_id):
for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
realpath_to_by_id[os.path.realpath(path)] = path
# Then compare the actual path to each serial port's
device_path_real = os.path.realpath(device_path)
for device in scan_serial_ports():
normalized_path = realpath_to_by_id.get(device.device, device.device)
if (
normalized_path == device_path
or os.path.realpath(device.device) == device_path_real
):
return USBDevice(
device=normalized_path,
vid=device.vid,
pid=device.pid,
serial_number=device.serial_number,
manufacturer=device.manufacturer,
description=device.description,
)
return None
def _fnmatch_lower(name: str | None, pattern: str) -> bool:
"""Match a lowercase version of the name."""
if name is None:
return False
return fnmatch.fnmatch(name.lower(), pattern)
def usb_device_matches_matcher(device: USBDevice, matcher: USBMatcher) -> bool:
"""Check if a USB device matches a USB matcher."""
if "vid" in matcher and device.vid != matcher["vid"]:
return False
if "pid" in matcher and device.pid != matcher["pid"]:
return False
if "serial_number" in matcher and not _fnmatch_lower(
device.serial_number, matcher["serial_number"]
):
return False
if "manufacturer" in matcher and not _fnmatch_lower(
device.manufacturer, matcher["manufacturer"]
):
return False
if "description" in matcher and not _fnmatch_lower(
device.description, matcher["description"]
):
return False
return True
def usb_unique_id_from_service_info(usb_info: UsbServiceInfo) -> str:
"""Generate a unique ID from USB service info."""
return (
f"{usb_info.vid}:{usb_info.pid}_"
f"{usb_info.serial_number}_"
f"{usb_info.manufacturer}_"
f"{usb_info.description}"
)
def usb_service_info_from_device(usb_device: USBDevice) -> UsbServiceInfo:
"""Convert a USBDevice to UsbServiceInfo."""
return UsbServiceInfo(
device=usb_device.device,
vid=usb_device.vid,
pid=usb_device.pid,
serial_number=usb_device.serial_number,
manufacturer=usb_device.manufacturer,
description=usb_device.description,
)

View File

@@ -10,14 +10,19 @@ from homeassistant.components.homeassistant_hardware.firmware_config_flow import
STEP_PICK_FIRMWARE_THREAD,
STEP_PICK_FIRMWARE_ZIGBEE,
)
from homeassistant.components.homeassistant_hardware.helpers import (
async_notify_firmware_info,
)
from homeassistant.components.homeassistant_hardware.util import (
ApplicationType,
FirmwareInfo,
)
from homeassistant.components.usb import USBDevice
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.setup import async_setup_component
from .common import USB_DATA_ZBT2
@@ -382,3 +387,122 @@ async def test_duplicate_discovery_updates_usb_path(hass: HomeAssistant) -> None
assert result["reason"] == "already_configured"
assert config_entry.data["device"] == USB_DATA_ZBT2.device
async def test_firmware_callback_auto_creates_entry(hass: HomeAssistant) -> None:
"""Test that firmware notification triggers import flow that auto-creates config entry."""
await async_setup_component(hass, "homeassistant_hardware", {})
await async_setup_component(hass, "usb", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "usb"}, data=USB_DATA_ZBT2
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "pick_firmware"
usb_device = USBDevice(
device=USB_DATA_ZBT2.device,
vid=USB_DATA_ZBT2.vid,
pid=USB_DATA_ZBT2.pid,
serial_number=USB_DATA_ZBT2.serial_number,
manufacturer=USB_DATA_ZBT2.manufacturer,
description=USB_DATA_ZBT2.description,
)
with patch(
"homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path",
return_value=usb_device,
):
await async_notify_firmware_info(
hass,
"zha",
FirmwareInfo(
device=USB_DATA_ZBT2.device,
firmware_type=ApplicationType.EZSP,
firmware_version="7.4.4.0",
owners=[],
source="zha",
),
)
await hass.async_block_till_done()
# The config entry was auto-created
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
assert entries[0].data == {
"device": USB_DATA_ZBT2.device,
"firmware": ApplicationType.EZSP.value,
"firmware_version": "7.4.4.0",
"vid": USB_DATA_ZBT2.vid,
"pid": USB_DATA_ZBT2.pid,
"serial_number": USB_DATA_ZBT2.serial_number,
"manufacturer": USB_DATA_ZBT2.manufacturer,
"product": USB_DATA_ZBT2.description,
}
# The discovery flow is gone
assert not hass.config_entries.flow.async_progress_by_handler(DOMAIN)
async def test_firmware_callback_updates_existing_entry(hass: HomeAssistant) -> None:
"""Test that firmware notification updates existing config entry device path."""
await async_setup_component(hass, "homeassistant_hardware", {})
await async_setup_component(hass, "usb", {})
# Create existing config entry with old device path
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
"firmware": ApplicationType.EZSP.value,
"firmware_version": "7.4.4.0",
"device": "/dev/oldpath",
"vid": USB_DATA_ZBT2.vid,
"pid": USB_DATA_ZBT2.pid,
"serial_number": USB_DATA_ZBT2.serial_number,
"manufacturer": USB_DATA_ZBT2.manufacturer,
"product": USB_DATA_ZBT2.description,
},
unique_id=(
f"{USB_DATA_ZBT2.vid}:{USB_DATA_ZBT2.pid}_"
f"{USB_DATA_ZBT2.serial_number}_"
f"{USB_DATA_ZBT2.manufacturer}_"
f"{USB_DATA_ZBT2.description}"
),
)
config_entry.add_to_hass(hass)
usb_device = USBDevice(
device=USB_DATA_ZBT2.device,
vid=USB_DATA_ZBT2.vid,
pid=USB_DATA_ZBT2.pid,
serial_number=USB_DATA_ZBT2.serial_number,
manufacturer=USB_DATA_ZBT2.manufacturer,
description=USB_DATA_ZBT2.description,
)
with patch(
"homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path",
return_value=usb_device,
):
await async_notify_firmware_info(
hass,
"zha",
FirmwareInfo(
device=USB_DATA_ZBT2.device,
firmware_type=ApplicationType.EZSP,
firmware_version="7.4.4.0",
owners=[],
source="zha",
),
)
await hass.async_block_till_done()
# The config entry device path should be updated
assert config_entry.data["device"] == USB_DATA_ZBT2.device
# No new config entry was created
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1

View File

@@ -1,7 +1,8 @@
"""Test hardware helpers."""
from collections.abc import Callable
import logging
from unittest.mock import AsyncMock, MagicMock, Mock, call
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch
import pytest
@@ -19,6 +20,7 @@ from homeassistant.components.homeassistant_hardware.util import (
ApplicationType,
FirmwareInfo,
)
from homeassistant.components.usb import USBDevice
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@@ -257,3 +259,113 @@ async def test_firmware_update_context_manager(hass: HomeAssistant) -> None:
# Should be cleaned up after first context
assert not async_is_firmware_update_in_progress(hass, device_path)
async def test_dispatcher_callback_self_unregister(hass: HomeAssistant) -> None:
"""Test callbacks can unregister themselves during notification."""
await async_setup_component(hass, "homeassistant_hardware", {})
called_callbacks = []
unregister_funcs = {}
def create_self_unregistering_callback(name: str) -> Callable[[FirmwareInfo], None]:
def callback(firmware_info: FirmwareInfo) -> None:
called_callbacks.append(name)
unregister_funcs[name]()
return callback
callback1 = create_self_unregistering_callback("callback1")
callback2 = create_self_unregistering_callback("callback2")
callback3 = create_self_unregistering_callback("callback3")
# Register all three callbacks and store their unregister functions
unregister_funcs["callback1"] = async_register_firmware_info_callback(
hass, "/dev/serial/by-id/device1", callback1
)
unregister_funcs["callback2"] = async_register_firmware_info_callback(
hass, "/dev/serial/by-id/device1", callback2
)
unregister_funcs["callback3"] = async_register_firmware_info_callback(
hass, "/dev/serial/by-id/device1", callback3
)
# All callbacks should be called and unregister themselves
await async_notify_firmware_info(hass, "zha", firmware_info=FIRMWARE_INFO_EZSP)
assert set(called_callbacks) == {"callback1", "callback2", "callback3"}
# No callbacks should be called since they all unregistered
called_callbacks.clear()
await async_notify_firmware_info(hass, "zha", firmware_info=FIRMWARE_INFO_EZSP)
assert not called_callbacks
async def test_firmware_callback_no_usb_device(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test firmware notification when usb_device_from_path returns None."""
await async_setup_component(hass, "homeassistant_hardware", {})
await async_setup_component(hass, "usb", {})
with (
patch(
"homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path",
return_value=None,
),
caplog.at_level(logging.DEBUG),
):
await async_notify_firmware_info(
hass,
"zha",
FirmwareInfo(
device="/dev/ttyUSB99",
firmware_type=ApplicationType.EZSP,
firmware_version="7.4.4.0",
owners=[],
source="zha",
),
)
await hass.async_block_till_done()
# This isn't a codepath that's expected but we won't fail in this case, just log
assert "Cannot find USB for path /dev/ttyUSB99" in caplog.text
async def test_firmware_callback_no_hardware_domain(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test firmware notification when no hardware domain is found for device."""
await async_setup_component(hass, "homeassistant_hardware", {})
await async_setup_component(hass, "usb", {})
# Create a USB device that doesn't match any hardware integration
usb_device = USBDevice(
device="/dev/ttyUSB0",
vid="9999",
pid="9999",
serial_number="TEST123",
manufacturer="Test Manufacturer",
description="Test Device",
)
with (
patch(
"homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path",
return_value=usb_device,
),
caplog.at_level(logging.DEBUG),
):
await async_notify_firmware_info(
hass,
"zha",
FirmwareInfo(
device="/dev/ttyUSB0",
firmware_type=ApplicationType.EZSP,
firmware_version="7.4.4.0",
owners=[],
source="zha",
),
)
await hass.async_block_till_done()
assert "No hardware integration found for device" in caplog.text

View File

@@ -10,6 +10,9 @@ from homeassistant.components.homeassistant_hardware.firmware_config_flow import
STEP_PICK_FIRMWARE_THREAD,
STEP_PICK_FIRMWARE_ZIGBEE,
)
from homeassistant.components.homeassistant_hardware.helpers import (
async_notify_firmware_info,
)
from homeassistant.components.homeassistant_hardware.silabs_multiprotocol_addon import (
CONF_DISABLE_MULTI_PAN,
get_flasher_addon_manager,
@@ -20,10 +23,12 @@ from homeassistant.components.homeassistant_hardware.util import (
FirmwareInfo,
)
from homeassistant.components.homeassistant_sky_connect.const import DOMAIN
from homeassistant.components.usb import USBDevice
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.setup import async_setup_component
from .common import USB_DATA_SKY, USB_DATA_ZBT1
@@ -426,3 +431,187 @@ async def test_options_flow_multipan_uninstall(
# We've reverted the firmware back to Zigbee
assert config_entry.data["firmware"] == "ezsp"
@pytest.mark.parametrize(
("usb_data", "model"),
[
(USB_DATA_SKY, "Home Assistant SkyConnect"),
(USB_DATA_ZBT1, "Home Assistant Connect ZBT-1"),
],
)
async def test_firmware_callback_auto_creates_entry(
usb_data: UsbServiceInfo,
model: str,
hass: HomeAssistant,
) -> None:
"""Test that firmware notification triggers import flow that auto-creates config entry."""
await async_setup_component(hass, "homeassistant_hardware", {})
await async_setup_component(hass, "usb", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "usb"}, data=usb_data
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "pick_firmware"
usb_device = USBDevice(
device=usb_data.device,
vid=usb_data.vid,
pid=usb_data.pid,
serial_number=usb_data.serial_number,
manufacturer=usb_data.manufacturer,
description=usb_data.description,
)
with patch(
"homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path",
return_value=usb_device,
):
await async_notify_firmware_info(
hass,
"zha",
FirmwareInfo(
device=usb_data.device,
firmware_type=ApplicationType.EZSP,
firmware_version="7.4.4.0",
owners=[],
source="zha",
),
)
await hass.async_block_till_done()
# The config entry was auto-created
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
assert entries[0].data == {
"device": usb_data.device,
"firmware": ApplicationType.EZSP.value,
"firmware_version": "7.4.4.0",
"vid": usb_data.vid,
"pid": usb_data.pid,
"serial_number": usb_data.serial_number,
"manufacturer": usb_data.manufacturer,
"description": usb_data.description,
"product": usb_data.description,
}
# The discovery flow is gone
assert not hass.config_entries.flow.async_progress_by_handler(DOMAIN)
@pytest.mark.parametrize(
("usb_data", "model"),
[
(USB_DATA_SKY, "Home Assistant SkyConnect"),
(USB_DATA_ZBT1, "Home Assistant Connect ZBT-1"),
],
)
async def test_duplicate_usb_discovery_aborts_early(
usb_data: UsbServiceInfo, model: str, hass: HomeAssistant
) -> None:
"""Test USB discovery aborts early when unique_id exists before serial path resolution."""
# Create existing config entry
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
"firmware": "ezsp",
"device": "/dev/oldpath",
"manufacturer": usb_data.manufacturer,
"pid": usb_data.pid,
"description": usb_data.description,
"product": usb_data.description,
"serial_number": usb_data.serial_number,
"vid": usb_data.vid,
},
unique_id=(
f"{usb_data.vid}:{usb_data.pid}_"
f"{usb_data.serial_number}_"
f"{usb_data.manufacturer}_"
f"{usb_data.description}"
),
)
config_entry.add_to_hass(hass)
# Try to discover the same device with a different path
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "usb"}, data=usb_data
)
# Should abort before get_serial_by_id is called
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
@pytest.mark.parametrize(
("usb_data", "model"),
[
(USB_DATA_SKY, "Home Assistant SkyConnect"),
(USB_DATA_ZBT1, "Home Assistant Connect ZBT-1"),
],
)
async def test_firmware_callback_updates_existing_entry(
usb_data: UsbServiceInfo, model: str, hass: HomeAssistant
) -> None:
"""Test that firmware notification updates existing config entry device path."""
await async_setup_component(hass, "homeassistant_hardware", {})
await async_setup_component(hass, "usb", {})
# Create existing config entry with old device path
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
"firmware": ApplicationType.EZSP.value,
"firmware_version": "7.4.4.0",
"device": "/dev/oldpath",
"vid": usb_data.vid,
"pid": usb_data.pid,
"serial_number": usb_data.serial_number,
"manufacturer": usb_data.manufacturer,
"description": usb_data.description,
"product": usb_data.description,
},
unique_id=(
f"{usb_data.vid}:{usb_data.pid}_"
f"{usb_data.serial_number}_"
f"{usb_data.manufacturer}_"
f"{usb_data.description}"
),
)
config_entry.add_to_hass(hass)
usb_device = USBDevice(
device=usb_data.device,
vid=usb_data.vid,
pid=usb_data.pid,
serial_number=usb_data.serial_number,
manufacturer=usb_data.manufacturer,
description=usb_data.description,
)
with patch(
"homeassistant.components.homeassistant_hardware.helpers.usb_device_from_path",
return_value=usb_device,
):
await async_notify_firmware_info(
hass,
"zha",
FirmwareInfo(
device=usb_data.device,
firmware_type=ApplicationType.EZSP,
firmware_version="7.4.4.0",
owners=[],
source="zha",
),
)
await hass.async_block_till_done()
# The config entry device path should be updated
assert config_entry.data["device"] == usb_data.device
# No new config entry was created
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1

View File

@@ -1,6 +1,7 @@
"""Tests for the USB Discovery integration."""
import asyncio
import dataclasses
from datetime import timedelta
import logging
import os
@@ -11,6 +12,7 @@ import pytest
from homeassistant.components import usb
from homeassistant.components.usb.models import USBDevice
from homeassistant.components.usb.utils import usb_device_from_path
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.helpers.service_info.usb import UsbServiceInfo
@@ -1383,3 +1385,126 @@ async def test_register_port_event_callback_failure(
assert caplog.text.count("Error in USB port event callback") == 2
assert "Failure 1" in caplog.text
assert "Failure 2" in caplog.text
def test_usb_device_from_path_with_symlinks() -> None:
"""Test usb_device_from_path resolves devices using symlink mapping."""
# Mock /dev/serial/by-id exists and contains symlinks
entry1 = MagicMock(spec_set=os.DirEntry)
entry1.is_symlink.return_value = True
entry1.path = "/dev/serial/by-id/usb-device1"
entry2 = MagicMock(spec_set=os.DirEntry)
entry2.is_symlink.return_value = True
entry2.path = "/dev/serial/by-id/usb-device2"
def mock_realpath(path: str) -> str:
realpath_map = {
"/dev/serial/by-id/usb-device1": "/dev/ttyUSB0",
"/dev/serial/by-id/usb-device2": "/dev/ttyUSB1",
"/dev/ttyUSB0": "/dev/ttyUSB0",
"/dev/ttyUSB1": "/dev/ttyUSB1",
}
return realpath_map.get(path, path)
usb_device = USBDevice(
device="/dev/ttyUSB0",
vid="1234",
pid="5678",
serial_number="ABC123",
manufacturer="Test Manufacturer",
description="Test Device",
)
with (
patch("os.path.isdir", return_value=True),
patch("os.scandir", return_value=[entry1, entry2]),
patch("os.path.realpath", side_effect=mock_realpath),
patch(
"homeassistant.components.usb.utils.scan_serial_ports",
return_value=[usb_device],
),
):
dev_from_path = usb_device_from_path("/dev/serial/by-id/usb-device1")
# The USB device for the given path differs from the `scan_serial_ports` only by its
# `device` pointing to a symlink
assert dev_from_path == dataclasses.replace(
usb_device, device="/dev/serial/by-id/usb-device1"
)
def test_usb_device_from_path_with_realpath_match() -> None:
"""Test usb_device_from_path falls back to the original path without a symlink."""
usb_device = USBDevice(
device="/dev/ttyUSB0",
vid="1234",
pid="5678",
serial_number="ABC123",
manufacturer="Test Manufacturer",
description="Test Device",
)
with (
patch("os.path.isdir", return_value=True),
patch("os.scandir", return_value=[]),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.scan_serial_ports",
return_value=[usb_device],
),
):
dev_from_path = usb_device_from_path("/dev/ttyUSB0")
# There is no symlink for the device so we must keep using the base `/dev/` path
assert dev_from_path == usb_device
def test_usb_device_from_path_no_match() -> None:
"""Test usb_device_from_path returns None when device not found."""
usb_device = USBDevice(
device="/dev/ttyUSB0",
vid="1234",
pid="5678",
serial_number="ABC123",
manufacturer="Test Manufacturer",
description="Test Device",
)
with (
patch("os.path.isdir", return_value=True),
patch("os.scandir", return_value=[]),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.scan_serial_ports",
return_value=[usb_device],
),
):
dev_from_path = usb_device_from_path("/dev/ttyUSB99")
assert dev_from_path is None
def test_usb_device_from_path_no_by_id_dir() -> None:
"""Test usb_device_from_path when /dev/serial/by-id doesn't exist."""
usb_device = USBDevice(
device="/dev/ttyUSB0",
vid="1234",
pid="5678",
serial_number="ABC123",
manufacturer="Test Manufacturer",
description="Test Device",
)
with (
patch("os.path.isdir", return_value=False),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.scan_serial_ports",
return_value=[usb_device],
),
):
dev_from_path = usb_device_from_path("/dev/ttyUSB0")
# We have no symlinks so we use the base path
assert dev_from_path == usb_device