Remove @progress_step decorator from ZHA and Hardware integration (#155867)

Co-authored-by: TheJulianJES <TheJulianJES@users.noreply.github.com>
This commit is contained in:
puddly
2025-11-07 15:26:05 -05:00
committed by GitHub
parent 0694372c61
commit 45c0891c3b
5 changed files with 324 additions and 118 deletions

View File

@@ -28,7 +28,7 @@ from homeassistant.config_entries import (
OptionsFlow,
)
from homeassistant.core import callback
from homeassistant.data_entry_flow import AbortFlow, progress_step
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.hassio import is_hassio
@@ -97,6 +97,12 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
self.addon_uninstall_task: asyncio.Task | None = None
self.firmware_install_task: asyncio.Task[None] | None = None
self.installing_firmware_name: str | None = None
self._install_otbr_addon_task: asyncio.Task[None] | None = None
self._start_otbr_addon_task: asyncio.Task[None] | None = None
# Progress flow steps cannot abort so we need to store the abort reason and then
# re-raise it in a dedicated step
self._progress_error: AbortFlow | None = None
def _get_translation_placeholders(self) -> dict[str, str]:
"""Shared translation placeholders."""
@@ -106,6 +112,11 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
if self._probed_firmware_info is not None
else "unknown"
),
"firmware_name": (
self.installing_firmware_name
if self.installing_firmware_name is not None
else "unknown"
),
"model": self._hardware_name,
}
@@ -182,22 +193,22 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
return self.async_show_progress(
step_id=step_id,
progress_action="install_firmware",
description_placeholders={
**self._get_translation_placeholders(),
"firmware_name": firmware_name,
},
description_placeholders=self._get_translation_placeholders(),
progress_task=self.firmware_install_task,
)
try:
await self.firmware_install_task
except AbortFlow as err:
return self.async_show_progress_done(
next_step_id=err.reason,
)
self._progress_error = err
return self.async_show_progress_done(next_step_id="progress_failed")
except HomeAssistantError:
_LOGGER.exception("Failed to flash firmware")
return self.async_show_progress_done(next_step_id="firmware_install_failed")
self._progress_error = AbortFlow(
reason="fw_install_failed",
description_placeholders=self._get_translation_placeholders(),
)
return self.async_show_progress_done(next_step_id="progress_failed")
finally:
self.firmware_install_task = None
@@ -241,7 +252,10 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
_LOGGER.debug("Skipping firmware upgrade due to index download failure")
return
raise AbortFlow(reason="firmware_download_failed") from err
raise AbortFlow(
reason="fw_download_failed",
description_placeholders=self._get_translation_placeholders(),
) from err
if not firmware_install_required:
assert self._probed_firmware_info is not None
@@ -270,7 +284,10 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
return
# Otherwise, fail
raise AbortFlow(reason="firmware_download_failed") from err
raise AbortFlow(
reason="fw_download_failed",
description_placeholders=self._get_translation_placeholders(),
) from err
self._probed_firmware_info = await async_flash_silabs_firmware(
hass=self.hass,
@@ -313,41 +330,6 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
await otbr_manager.async_start_addon_waiting()
async def async_step_firmware_download_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Abort when firmware download failed."""
assert self.installing_firmware_name is not None
return self.async_abort(
reason="fw_download_failed",
description_placeholders={
**self._get_translation_placeholders(),
"firmware_name": self.installing_firmware_name,
},
)
async def async_step_firmware_install_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Abort when firmware install failed."""
assert self.installing_firmware_name is not None
return self.async_abort(
reason="fw_install_failed",
description_placeholders={
**self._get_translation_placeholders(),
"firmware_name": self.installing_firmware_name,
},
)
async def async_step_unsupported_firmware(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Abort when unsupported firmware is detected."""
return self.async_abort(
reason="unsupported_firmware",
description_placeholders=self._get_translation_placeholders(),
)
async def async_step_zigbee_installation_type(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
@@ -511,16 +493,15 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
"""Install Thread firmware."""
raise NotImplementedError
@progress_step(
description_placeholders=lambda self: {
**self._get_translation_placeholders(),
"addon_name": get_otbr_addon_manager(self.hass).addon_name,
}
)
async def async_step_install_otbr_addon(
async def async_step_progress_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Show progress dialog for installing the OTBR addon."""
"""Abort when progress step failed."""
assert self._progress_error is not None
raise self._progress_error
async def _async_install_otbr_addon(self) -> None:
"""Do the work of installing the OTBR addon."""
addon_manager = get_otbr_addon_manager(self.hass)
addon_info = await self._async_get_addon_info(addon_manager)
@@ -538,18 +519,39 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
},
) from err
return await self.async_step_finish_thread_installation()
@progress_step(
description_placeholders=lambda self: {
**self._get_translation_placeholders(),
"addon_name": get_otbr_addon_manager(self.hass).addon_name,
}
)
async def async_step_start_otbr_addon(
async def async_step_install_otbr_addon(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure OTBR to point to the SkyConnect and run the addon."""
"""Show progress dialog for installing the OTBR addon."""
if self._install_otbr_addon_task is None:
self._install_otbr_addon_task = self.hass.async_create_task(
self._async_install_otbr_addon(),
"Install OTBR addon",
)
if not self._install_otbr_addon_task.done():
return self.async_show_progress(
step_id="install_otbr_addon",
progress_action="install_otbr_addon",
description_placeholders={
**self._get_translation_placeholders(),
"addon_name": get_otbr_addon_manager(self.hass).addon_name,
},
progress_task=self._install_otbr_addon_task,
)
try:
await self._install_otbr_addon_task
except AbortFlow as err:
self._progress_error = err
return self.async_show_progress_done(next_step_id="progress_failed")
finally:
self._install_otbr_addon_task = None
return self.async_show_progress_done(next_step_id="finish_thread_installation")
async def _async_start_otbr_addon(self) -> None:
"""Do the work of starting the OTBR addon."""
try:
await self._configure_and_start_otbr_addon()
except AddonError as err:
@@ -562,7 +564,36 @@ class BaseFirmwareInstallFlow(ConfigEntryBaseFlow, ABC):
},
) from err
return await self.async_step_pre_confirm_otbr()
async def async_step_start_otbr_addon(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure OTBR to point to the SkyConnect and run the addon."""
if self._start_otbr_addon_task is None:
self._start_otbr_addon_task = self.hass.async_create_task(
self._async_start_otbr_addon(),
"Start OTBR addon",
)
if not self._start_otbr_addon_task.done():
return self.async_show_progress(
step_id="start_otbr_addon",
progress_action="start_otbr_addon",
description_placeholders={
**self._get_translation_placeholders(),
"addon_name": get_otbr_addon_manager(self.hass).addon_name,
},
progress_task=self._start_otbr_addon_task,
)
try:
await self._start_otbr_addon_task
except AbortFlow as err:
self._progress_error = err
return self.async_show_progress_done(next_step_id="progress_failed")
finally:
self._start_otbr_addon_task = None
return self.async_show_progress_done(next_step_id="pre_confirm_otbr")
async def async_step_pre_confirm_otbr(
self, user_input: dict[str, Any] | None = None

View File

@@ -39,7 +39,7 @@ from homeassistant.config_entries import (
)
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import AbortFlow, progress_step
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.selector import FileSelector, FileSelectorConfig
@@ -191,8 +191,14 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
self._hass = None # type: ignore[assignment]
self._radio_mgr = ZhaRadioManager()
self._restore_backup_task: asyncio.Task[None] | None = None
self._reset_old_radio_task: asyncio.Task[None] | None = None
self._form_network_task: asyncio.Task[None] | None = None
self._extra_network_config: dict[str, Any] = {}
# Progress flow steps cannot abort so we need to store the abort reason and then
# re-raise it in a dedicated step
self._progress_error: AbortFlow | None = None
@property
def hass(self) -> HomeAssistant:
"""Return hass."""
@@ -224,6 +230,13 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
async def _async_create_radio_entry(self) -> ConfigFlowResult:
"""Create a config entry with the current flow state."""
async def async_step_progress_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Abort when progress step failed."""
assert self._progress_error is not None
raise self._progress_error
async def async_step_choose_serial_port(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
@@ -464,7 +477,22 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
self._radio_mgr.chosen_backup = self._radio_mgr.backups[0]
return await self.async_step_maybe_reset_old_radio()
@progress_step()
async def _async_reset_old_radio(self, config_entry: ConfigEntry) -> None:
"""Do the work of resetting the old radio."""
# Unload ZHA before connecting to the old adapter
with suppress(OperationNotAllowed):
await self.hass.config_entries.async_unload(config_entry.entry_id)
# Create a radio manager to connect to the old stick to reset it
temp_radio_mgr = ZhaRadioManager()
temp_radio_mgr.hass = self.hass
temp_radio_mgr.device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH]
temp_radio_mgr.device_settings = config_entry.data[CONF_DEVICE]
temp_radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]]
await temp_radio_mgr.async_reset_adapter()
async def async_step_maybe_reset_old_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
@@ -475,30 +503,36 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
DOMAIN, include_ignore=False
)
if config_entries:
if not config_entries:
return await self.async_step_restore_backup()
if self._reset_old_radio_task is None:
# This will only ever be called during migration, so there must be an
# existing config entry
assert len(config_entries) == 1
config_entry = config_entries[0]
# Unload ZHA before connecting to the old adapter
with suppress(OperationNotAllowed):
await self.hass.config_entries.async_unload(config_entry.entry_id)
self._reset_old_radio_task = self.hass.async_create_task(
self._async_reset_old_radio(config_entry),
"Reset old radio",
)
# Create a radio manager to connect to the old stick to reset it
temp_radio_mgr = ZhaRadioManager()
temp_radio_mgr.hass = self.hass
temp_radio_mgr.device_path = config_entry.data[CONF_DEVICE][
CONF_DEVICE_PATH
]
temp_radio_mgr.device_settings = config_entry.data[CONF_DEVICE]
temp_radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]]
if not self._reset_old_radio_task.done():
return self.async_show_progress(
step_id="maybe_reset_old_radio",
progress_action="maybe_reset_old_radio",
progress_task=self._reset_old_radio_task,
)
try:
await temp_radio_mgr.async_reset_adapter()
except HomeAssistantError:
# Old adapter not found or cannot connect, show prompt to plug back in
return await self.async_step_plug_in_old_radio()
try:
await self._reset_old_radio_task
except HomeAssistantError:
# Old adapter not found or cannot connect, show prompt to plug back in
return self.async_show_progress_done(next_step_id="plug_in_old_radio")
finally:
self._reset_old_radio_task = None
return await self.async_step_restore_backup()
return self.async_show_progress_done(next_step_id="restore_backup")
async def async_step_plug_in_old_radio(
self, user_input: dict[str, Any] | None = None
@@ -618,16 +652,35 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
# This step exists only for translations, it does nothing new
return await self.async_step_form_new_network(user_input)
@progress_step()
async def _async_form_new_network(self) -> None:
"""Do the work of forming a new network."""
await self._radio_mgr.async_form_network(config=self._extra_network_config)
# Load the newly formed network settings to get the network info
await self._radio_mgr.async_load_network_settings()
async def async_step_form_new_network(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Form a brand-new network."""
await self._radio_mgr.async_form_network(config=self._extra_network_config)
if self._form_network_task is None:
self._form_network_task = self.hass.async_create_task(
self._async_form_new_network(),
"Form new network",
)
# Load the newly formed network settings to get the network info
await self._radio_mgr.async_load_network_settings()
return await self._async_create_radio_entry()
if not self._form_network_task.done():
return self.async_show_progress(
step_id="form_new_network",
progress_action="form_new_network",
progress_task=self._form_network_task,
)
try:
await self._form_network_task
finally:
self._form_network_task = None
return self.async_show_progress_done(next_step_id="create_entry")
def _parse_uploaded_backup(
self, uploaded_file_id: str
@@ -735,10 +788,11 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
# User unplugged the new adapter, allow retry
return self.async_show_progress_done(next_step_id="pre_plug_in_new_radio")
except CannotWriteNetworkSettings as exc:
return self.async_abort(
self._progress_error = AbortFlow(
reason="cannot_restore_backup",
description_placeholders={"error": str(exc)},
)
return self.async_show_progress_done(next_step_id="progress_failed")
finally:
self._restore_backup_task = None

View File

@@ -792,10 +792,11 @@ async def test_config_flow_thread(
assert pick_result["type"] is FlowResultType.SHOW_PROGRESS
assert pick_result["progress_action"] == "install_firmware"
assert pick_result["step_id"] == "install_thread_firmware"
description_placeholders = pick_result["description_placeholders"]
assert description_placeholders is not None
assert description_placeholders["firmware_type"] == "ezsp"
assert description_placeholders["model"] == TEST_HARDWARE_NAME
assert pick_result["description_placeholders"] == {
"firmware_type": "ezsp",
"model": TEST_HARDWARE_NAME,
"firmware_name": "Thread",
}
await hass.async_block_till_done(wait_background_tasks=True)
@@ -919,10 +920,11 @@ async def test_options_flow_zigbee_to_thread(
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "pick_firmware"
description_placeholders = result["description_placeholders"]
assert description_placeholders is not None
assert description_placeholders["firmware_type"] == "ezsp"
assert description_placeholders["model"] == TEST_HARDWARE_NAME
assert result["description_placeholders"] == {
"firmware_type": "ezsp",
"model": TEST_HARDWARE_NAME,
"firmware_name": "unknown",
}
result = await hass.config_entries.options.async_configure(
result["flow_id"],
@@ -995,10 +997,11 @@ async def test_options_flow_thread_to_zigbee(hass: HomeAssistant) -> None:
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "pick_firmware"
description_placeholders = result["description_placeholders"]
assert description_placeholders is not None
assert description_placeholders["firmware_type"] == "spinel"
assert description_placeholders["model"] == TEST_HARDWARE_NAME
assert result["description_placeholders"] == {
"firmware_type": "spinel",
"model": TEST_HARDWARE_NAME,
"firmware_name": "unknown",
}
with mock_firmware_info(
probe_app_type=ApplicationType.SPINEL,

View File

@@ -69,6 +69,11 @@ async def test_config_flow_thread_not_hassio(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "not_hassio_thread"
assert result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "spinel",
"firmware_name": "Thread",
}
@pytest.mark.parametrize(
@@ -110,6 +115,12 @@ async def test_config_flow_thread_addon_info_fails(
# Cannot get addon info
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "addon_info_failed"
assert result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "spinel",
"firmware_name": "Thread",
"addon_name": "OpenThread Border Router",
}
@pytest.mark.usefixtures("addon_not_installed")
@@ -155,6 +166,12 @@ async def test_config_flow_thread_addon_install_fails(
# Cannot install addon
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "addon_install_failed"
assert result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "ezsp",
"firmware_name": "Thread",
"addon_name": "OpenThread Border Router",
}
@pytest.mark.usefixtures("addon_installed")
@@ -196,6 +213,12 @@ async def test_config_flow_thread_addon_set_config_fails(
assert pick_thread_progress_result["type"] == FlowResultType.ABORT
assert pick_thread_progress_result["reason"] == "addon_set_config_failed"
assert pick_thread_progress_result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "ezsp",
"firmware_name": "Thread",
"addon_name": "OpenThread Border Router",
}
@pytest.mark.usefixtures("addon_installed")
@@ -236,6 +259,12 @@ async def test_config_flow_thread_flasher_run_fails(
assert pick_thread_progress_result["type"] == FlowResultType.ABORT
assert pick_thread_progress_result["reason"] == "addon_start_failed"
assert pick_thread_progress_result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "ezsp",
"firmware_name": "Thread",
"addon_name": "OpenThread Border Router",
}
@pytest.mark.usefixtures("addon_running")
@@ -273,6 +302,11 @@ async def test_config_flow_thread_confirmation_fails(hass: HomeAssistant) -> Non
assert pick_thread_progress_result["type"] is FlowResultType.ABORT
assert pick_thread_progress_result["reason"] == "fw_install_failed"
assert pick_thread_progress_result["description_placeholders"] == {
"firmware_name": "Thread",
"model": TEST_HARDWARE_NAME,
"firmware_type": "ezsp",
}
@pytest.mark.parametrize(
@@ -310,6 +344,11 @@ async def test_config_flow_firmware_index_download_fails_and_required(
assert pick_result["type"] is FlowResultType.ABORT
assert pick_result["reason"] == "fw_download_failed"
assert pick_result["description_placeholders"] == {
"firmware_name": "Zigbee",
"model": TEST_HARDWARE_NAME,
"firmware_type": "spinel",
}
@pytest.mark.parametrize(
@@ -347,6 +386,11 @@ async def test_config_flow_firmware_download_fails_and_required(
assert pick_result["type"] is FlowResultType.ABORT
assert pick_result["reason"] == "fw_download_failed"
assert pick_result["description_placeholders"] == {
"firmware_name": "Zigbee",
"model": TEST_HARDWARE_NAME,
"firmware_type": "spinel",
}
@pytest.mark.parametrize(
@@ -395,6 +439,11 @@ async def test_options_flow_zigbee_to_thread_zha_configured(
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "zha_still_using_stick"
assert result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "ezsp",
"firmware_name": "unknown",
}
@pytest.mark.parametrize(
@@ -442,3 +491,8 @@ async def test_options_flow_thread_to_zigbee_otbr_configured(
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "otbr_still_using_stick"
assert result["description_placeholders"] == {
"model": TEST_HARDWARE_NAME,
"firmware_type": "spinel",
"firmware_name": "unknown",
}

View File

@@ -215,6 +215,15 @@ async def consume_progress_flow(
return result
class DelayedAsyncMock(AsyncMock):
"""AsyncMock that waits a moment before returning, useful for progress steps."""
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Overridden `__call__` with an added delay."""
await asyncio.sleep(0)
return await super().__call__(*args, **kwargs)
@pytest.mark.parametrize(
("entry_name", "unique_id", "radio_type", "service_info"),
[
@@ -720,6 +729,7 @@ async def test_migration_strategy_recommended_cannot_write(
with patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.restore_backup",
new_callable=DelayedAsyncMock,
side_effect=CannotWriteNetworkSettings("test error"),
) as mock_restore_backup:
result_migrate = await hass.config_entries.flow.async_configure(
@@ -1735,7 +1745,7 @@ async def test_strategy_no_network_settings(
advanced_pick_radio: RadioPicker, mock_app: AsyncMock, hass: HomeAssistant
) -> None:
"""Test formation strategy when no network settings are present."""
mock_app.load_network_info = MagicMock(side_effect=NetworkNotFormed())
mock_app.load_network_info = DelayedAsyncMock(side_effect=NetworkNotFormed())
result = await advanced_pick_radio(RadioType.ezsp)
assert (
@@ -1773,7 +1783,7 @@ async def test_formation_strategy_form_initial_network(
) -> None:
"""Test forming a new network, with no previous settings on the radio."""
# Initially, no network is formed
mock_app.load_network_info = AsyncMock(side_effect=NetworkNotFormed())
mock_app.load_network_info = DelayedAsyncMock(side_effect=NetworkNotFormed())
# After form_network is called, load_network_info should return the network settings
async def form_network_side_effect(*args, **kwargs):
@@ -1807,7 +1817,7 @@ async def test_onboarding_auto_formation_new_hardware(
) -> None:
"""Test auto network formation with new hardware during onboarding."""
# Initially, no network is formed
mock_app.load_network_info = AsyncMock(side_effect=NetworkNotFormed())
mock_app.load_network_info = DelayedAsyncMock(side_effect=NetworkNotFormed())
mock_app.get_device = MagicMock(return_value=MagicMock(spec=zigpy.device.Device))
# After form_network is called, load_network_info should return the network settings
@@ -1951,6 +1961,7 @@ async def test_formation_strategy_restore_manual_backup_overwrite_ieee_ezsp(
),
patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.restore_backup",
new_callable=DelayedAsyncMock,
side_effect=[
DestructiveWriteNetworkSettings("Radio IEEE change is permanent"),
None,
@@ -1981,8 +1992,14 @@ async def test_formation_strategy_restore_manual_backup_overwrite_ieee_ezsp(
user_input={config_flow.OVERWRITE_COORDINATOR_IEEE: True},
)
assert result_confirm["type"] is FlowResultType.CREATE_ENTRY
assert result_confirm["data"][CONF_RADIO_TYPE] == "ezsp"
result_final = await consume_progress_flow(
hass,
flow_id=result_confirm["flow_id"],
valid_step_ids=("restore_backup",),
)
assert result_final["type"] is FlowResultType.CREATE_ENTRY
assert result_final["data"][CONF_RADIO_TYPE] == "ezsp"
assert mock_restore_backup.call_count == 1
assert mock_restore_backup.mock_calls[0].kwargs["overwrite_ieee"] is True
@@ -2014,6 +2031,7 @@ async def test_formation_strategy_restore_manual_backup_ezsp(
),
patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.restore_backup",
new_callable=DelayedAsyncMock,
side_effect=[
DestructiveWriteNetworkSettings("Radio IEEE change is permanent"),
None,
@@ -2316,6 +2334,7 @@ async def test_options_flow_defaults(
# ZHA gets unloaded
with patch(
"homeassistant.config_entries.ConfigEntries.async_unload",
new_callable=DelayedAsyncMock,
side_effect=[async_unload_effect],
) as mock_async_unload:
result1 = await hass.config_entries.options.async_configure(
@@ -2853,6 +2872,7 @@ async def test_config_flow_port_no_multiprotocol(hass: HomeAssistant) -> None:
patch("homeassistant.components.zha.config_flow.is_hassio", return_value=True),
patch(
"homeassistant.components.hassio.addon_manager.AddonManager.async_get_addon_info",
new_callable=DelayedAsyncMock,
side_effect=AddonError,
),
patch(
@@ -3075,6 +3095,7 @@ async def test_formation_strategy_restore_manual_backup_overwrite_ieee_ezsp_writ
),
patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.restore_backup",
new_callable=DelayedAsyncMock,
side_effect=[
DestructiveWriteNetworkSettings("Radio IEEE change is permanent"),
CannotWriteNetworkSettings("Failed to write settings"),
@@ -3100,11 +3121,17 @@ async def test_formation_strategy_restore_manual_backup_overwrite_ieee_ezsp_writ
assert confirm_restore_result["type"] is FlowResultType.FORM
assert confirm_restore_result["step_id"] == "confirm_ezsp_ieee_overwrite"
final_result = await hass.config_entries.flow.async_configure(
confirm_result = await hass.config_entries.flow.async_configure(
confirm_restore_result["flow_id"],
user_input={config_flow.OVERWRITE_COORDINATOR_IEEE: True},
)
final_result = await consume_progress_flow(
hass,
flow_id=confirm_result["flow_id"],
valid_step_ids=("restore_backup",),
)
assert final_result["type"] is FlowResultType.ABORT
assert final_result["reason"] == "cannot_restore_backup"
assert (
@@ -3191,6 +3218,7 @@ async def test_plug_in_new_radio_retry(
),
patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.restore_backup",
new_callable=DelayedAsyncMock,
side_effect=[
HomeAssistantError(
"Failed to connect to Zigbee adapter: [Errno 2] No such file or directory"
@@ -3203,43 +3231,67 @@ async def test_plug_in_new_radio_retry(
],
) as mock_restore_backup,
):
result3 = await hass.config_entries.flow.async_configure(
upload_result = await hass.config_entries.flow.async_configure(
result2["flow_id"],
user_input={config_flow.UPLOADED_BACKUP_FILE: str(uuid.uuid4())},
)
result3 = await consume_progress_flow(
hass,
flow_id=upload_result["flow_id"],
valid_step_ids=("restore_backup",),
)
# Prompt user to plug old adapter back in when restore fails
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "plug_in_new_radio"
assert result3["description_placeholders"] == {"device_path": "/dev/ttyUSB1234"}
# Submit retry attempt with plugged in adapter
result4 = await hass.config_entries.flow.async_configure(
retry_result = await hass.config_entries.flow.async_configure(
result3["flow_id"],
user_input={},
)
result4 = await consume_progress_flow(
hass,
flow_id=retry_result["flow_id"],
valid_step_ids=("restore_backup",),
)
# This adapter requires user confirmation for restore
assert result4["type"] is FlowResultType.FORM
assert result4["step_id"] == "confirm_ezsp_ieee_overwrite"
# Confirm destructive rewrite, but adapter is unplugged again
result5 = await hass.config_entries.flow.async_configure(
result3["flow_id"],
confirm_result = await hass.config_entries.flow.async_configure(
result4["flow_id"],
user_input={config_flow.OVERWRITE_COORDINATOR_IEEE: True},
)
result5 = await consume_progress_flow(
hass,
flow_id=confirm_result["flow_id"],
valid_step_ids=("restore_backup",),
)
# Prompt user to plug old adapter back in again
assert result5["type"] is FlowResultType.FORM
assert result5["step_id"] == "plug_in_new_radio"
assert result5["description_placeholders"] == {"device_path": "/dev/ttyUSB1234"}
# User confirms they plugged in the adapter
result6 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
final_retry_result = await hass.config_entries.flow.async_configure(
result5["flow_id"],
user_input={},
)
result6 = await consume_progress_flow(
hass,
flow_id=final_retry_result["flow_id"],
valid_step_ids=("restore_backup",),
)
# Entry created successfully
assert result6["type"] is FlowResultType.CREATE_ENTRY
assert result6["data"][CONF_RADIO_TYPE] == "ezsp"
@@ -3277,7 +3329,7 @@ async def test_plug_in_old_radio_retry(hass: HomeAssistant, backup, mock_app) ->
)
mock_temp_radio_mgr = AsyncMock()
mock_temp_radio_mgr.async_reset_adapter = AsyncMock(
mock_temp_radio_mgr.async_reset_adapter = DelayedAsyncMock(
side_effect=HomeAssistantError(
"Failed to connect to Zigbee adapter: [Errno 2] No such file or directory"
)
@@ -3303,11 +3355,17 @@ async def test_plug_in_old_radio_retry(hass: HomeAssistant, backup, mock_app) ->
assert result_confirm["step_id"] == "choose_migration_strategy"
result_recommended = await hass.config_entries.flow.async_configure(
recommended_result = await hass.config_entries.flow.async_configure(
result_confirm["flow_id"],
user_input={"next_step_id": config_flow.MIGRATION_STRATEGY_RECOMMENDED},
)
result_recommended = await consume_progress_flow(
hass,
flow_id=recommended_result["flow_id"],
valid_step_ids=("maybe_reset_old_radio",),
)
# Prompt user to plug old adapter back in when reset fails
assert result_recommended["type"] is FlowResultType.MENU
assert result_recommended["step_id"] == "plug_in_old_radio"
@@ -3321,11 +3379,17 @@ async def test_plug_in_old_radio_retry(hass: HomeAssistant, backup, mock_app) ->
]
# Retry with unplugged adapter
result_retry = await hass.config_entries.flow.async_configure(
retry_result = await hass.config_entries.flow.async_configure(
result_recommended["flow_id"],
user_input={"next_step_id": "retry_old_radio"},
)
result_retry = await consume_progress_flow(
hass,
flow_id=retry_result["flow_id"],
valid_step_ids=("maybe_reset_old_radio",),
)
# Prompt user again to plug old adapter back in
assert result_retry["type"] is FlowResultType.MENU
assert result_retry["step_id"] == "plug_in_old_radio"