Energyid integration (#138206)

Co-authored-by: Jan Pecinovsky <jan.pecinovsky@energieid.be>
Co-authored-by: Jan Pecinovsky <janpecinovsky@gmail.com>
Co-authored-by: Norbert Rittel <norbert@rittel.de>
Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
Oscar
2025-11-26 13:38:57 +01:00
committed by GitHub
parent 13bc0ebed8
commit 6baf77d256
20 changed files with 3981 additions and 0 deletions

View File

@@ -187,6 +187,7 @@ homeassistant.components.elkm1.*
homeassistant.components.emulated_hue.*
homeassistant.components.energenie_power_sockets.*
homeassistant.components.energy.*
homeassistant.components.energyid.*
homeassistant.components.energyzero.*
homeassistant.components.enigma2.*
homeassistant.components.enphase_envoy.*

2
CODEOWNERS generated
View File

@@ -452,6 +452,8 @@ build.json @home-assistant/supervisor
/tests/components/energenie_power_sockets/ @gnumpi
/homeassistant/components/energy/ @home-assistant/core
/tests/components/energy/ @home-assistant/core
/homeassistant/components/energyid/ @JrtPec @Molier
/tests/components/energyid/ @JrtPec @Molier
/homeassistant/components/energyzero/ @klaasnicolaas
/tests/components/energyzero/ @klaasnicolaas
/homeassistant/components/enigma2/ @autinerd

View File

@@ -0,0 +1,401 @@
"""The EnergyID integration."""
from __future__ import annotations
from dataclasses import dataclass
import datetime as dt
from datetime import timedelta
import functools
import logging
from aiohttp import ClientError, ClientResponseError
from energyid_webhooks.client_v2 import WebhookClient
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import (
CALLBACK_TYPE,
Event,
EventStateChangedData,
HomeAssistant,
callback,
)
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import (
async_track_entity_registry_updated_event,
async_track_state_change_event,
async_track_time_interval,
)
from .const import (
CONF_DEVICE_ID,
CONF_DEVICE_NAME,
CONF_ENERGYID_KEY,
CONF_HA_ENTITY_UUID,
CONF_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
type EnergyIDConfigEntry = ConfigEntry[EnergyIDRuntimeData]
DEFAULT_UPLOAD_INTERVAL_SECONDS = 60
@dataclass
class EnergyIDRuntimeData:
"""Runtime data for the EnergyID integration."""
client: WebhookClient
mappings: dict[str, str]
state_listener: CALLBACK_TYPE | None = None
registry_tracking_listener: CALLBACK_TYPE | None = None
unavailable_logged: bool = False
async def async_setup_entry(hass: HomeAssistant, entry: EnergyIDConfigEntry) -> bool:
"""Set up EnergyID from a config entry."""
session = async_get_clientsession(hass)
client = WebhookClient(
provisioning_key=entry.data[CONF_PROVISIONING_KEY],
provisioning_secret=entry.data[CONF_PROVISIONING_SECRET],
device_id=entry.data[CONF_DEVICE_ID],
device_name=entry.data[CONF_DEVICE_NAME],
session=session,
)
entry.runtime_data = EnergyIDRuntimeData(
client=client,
mappings={},
)
is_claimed = None
try:
is_claimed = await client.authenticate()
except TimeoutError as err:
raise ConfigEntryNotReady(
f"Timeout authenticating with EnergyID: {err}"
) from err
except ClientResponseError as err:
# 401/403 = invalid credentials, trigger reauth
if err.status in (401, 403):
raise ConfigEntryAuthFailed(f"Invalid credentials: {err}") from err
# Other HTTP errors are likely temporary
raise ConfigEntryNotReady(
f"HTTP error authenticating with EnergyID: {err}"
) from err
except ClientError as err:
# Network/connection errors are temporary
raise ConfigEntryNotReady(
f"Connection error authenticating with EnergyID: {err}"
) from err
except Exception as err:
# Unknown errors - log and retry (safer than forcing reauth)
_LOGGER.exception("Unexpected error during EnergyID authentication")
raise ConfigEntryNotReady(
f"Unexpected error authenticating with EnergyID: {err}"
) from err
if not is_claimed:
# Device exists but not claimed = user needs to claim it = auth issue
raise ConfigEntryAuthFailed("Device is not claimed. Please re-authenticate.")
_LOGGER.debug("EnergyID device '%s' authenticated successfully", client.device_name)
async def _async_synchronize_sensors(now: dt.datetime | None = None) -> None:
"""Callback for periodically synchronizing sensor data."""
try:
await client.synchronize_sensors()
if entry.runtime_data.unavailable_logged:
_LOGGER.debug("Connection to EnergyID re-established")
entry.runtime_data.unavailable_logged = False
except (OSError, RuntimeError) as err:
if not entry.runtime_data.unavailable_logged:
_LOGGER.debug("EnergyID is unavailable: %s", err)
entry.runtime_data.unavailable_logged = True
upload_interval = DEFAULT_UPLOAD_INTERVAL_SECONDS
if client.webhook_policy:
upload_interval = client.webhook_policy.get(
"uploadInterval", DEFAULT_UPLOAD_INTERVAL_SECONDS
)
# Schedule the callback and automatically unsubscribe when the entry is unloaded.
entry.async_on_unload(
async_track_time_interval(
hass, _async_synchronize_sensors, timedelta(seconds=upload_interval)
)
)
entry.async_on_unload(entry.add_update_listener(config_entry_update_listener))
update_listeners(hass, entry)
_LOGGER.debug(
"Starting EnergyID background sync for '%s'",
client.device_name,
)
return True
async def config_entry_update_listener(
hass: HomeAssistant, entry: EnergyIDConfigEntry
) -> None:
"""Handle config entry updates, including subentry changes."""
_LOGGER.debug("Config entry updated for %s, reloading listeners", entry.entry_id)
update_listeners(hass, entry)
@callback
def update_listeners(hass: HomeAssistant, entry: EnergyIDConfigEntry) -> None:
"""Set up or update state listeners and queue initial states."""
runtime_data = entry.runtime_data
client = runtime_data.client
# Clean up old state listener
if runtime_data.state_listener:
runtime_data.state_listener()
runtime_data.state_listener = None
mappings: dict[str, str] = {}
entities_to_track: list[str] = []
old_mappings = set(runtime_data.mappings.keys())
new_mappings = set()
ent_reg = er.async_get(hass)
subentries = list(entry.subentries.values())
_LOGGER.debug(
"Found %d subentries in entry.subentries: %s",
len(subentries),
[s.data for s in subentries],
)
# Build current entity mappings
tracked_entity_ids = []
for subentry in subentries:
entity_uuid = subentry.data.get(CONF_HA_ENTITY_UUID)
energyid_key = subentry.data.get(CONF_ENERGYID_KEY)
if not (entity_uuid and energyid_key):
continue
entity_entry = ent_reg.async_get(entity_uuid)
if not entity_entry:
_LOGGER.warning(
"Entity with UUID %s does not exist, skipping mapping to %s",
entity_uuid,
energyid_key,
)
continue
ha_entity_id = entity_entry.entity_id
tracked_entity_ids.append(ha_entity_id)
if not hass.states.get(ha_entity_id):
# Entity exists in registry but is not present in the state machine
_LOGGER.debug(
"Entity %s does not exist in state machine yet, will track when available (mapping to %s)",
ha_entity_id,
energyid_key,
)
# Still add to entities_to_track so we can handle it when state appears
entities_to_track.append(ha_entity_id)
continue
mappings[ha_entity_id] = energyid_key
entities_to_track.append(ha_entity_id)
new_mappings.add(ha_entity_id)
client.get_or_create_sensor(energyid_key)
if ha_entity_id not in old_mappings:
_LOGGER.debug(
"New mapping detected for %s, queuing initial state", ha_entity_id
)
if (
current_state := hass.states.get(ha_entity_id)
) and current_state.state not in (
STATE_UNKNOWN,
STATE_UNAVAILABLE,
):
try:
value = float(current_state.state)
timestamp = current_state.last_updated or dt.datetime.now(dt.UTC)
client.get_or_create_sensor(energyid_key).update(value, timestamp)
except (ValueError, TypeError):
_LOGGER.debug(
"Could not convert initial state of %s to float: %s",
ha_entity_id,
current_state.state,
)
# Clean up old entity registry listener
if runtime_data.registry_tracking_listener:
runtime_data.registry_tracking_listener()
runtime_data.registry_tracking_listener = None
# Set up listeners for entity registry changes
if tracked_entity_ids:
_LOGGER.debug("Setting up entity registry tracking for: %s", tracked_entity_ids)
def _handle_entity_registry_change(
event: Event[er.EventEntityRegistryUpdatedData],
) -> None:
"""Handle entity registry changes for our tracked entities."""
_LOGGER.debug("Registry event for tracked entity: %s", event.data)
if event.data["action"] == "update":
# Type is now narrowed to _EventEntityRegistryUpdatedData_Update
if "entity_id" in event.data["changes"]:
old_entity_id = event.data["changes"]["entity_id"]
new_entity_id = event.data["entity_id"]
_LOGGER.debug(
"Tracked entity ID changed: %s -> %s",
old_entity_id,
new_entity_id,
)
# Entity ID changed, need to reload listeners to track new ID
update_listeners(hass, entry)
elif event.data["action"] == "remove":
_LOGGER.debug("Tracked entity removed: %s", event.data["entity_id"])
# reminder: Create repair issue to notify user about removed entity
update_listeners(hass, entry)
# Track the specific entity IDs we care about
unsub_entity_registry = async_track_entity_registry_updated_event(
hass, tracked_entity_ids, _handle_entity_registry_change
)
runtime_data.registry_tracking_listener = unsub_entity_registry
if removed_mappings := old_mappings - new_mappings:
_LOGGER.debug("Removed mappings: %s", ", ".join(removed_mappings))
runtime_data.mappings = mappings
if not entities_to_track:
_LOGGER.debug(
"No valid sensor mappings configured for '%s'", client.device_name
)
return
unsub_state_change = async_track_state_change_event(
hass,
entities_to_track,
functools.partial(_async_handle_state_change, hass, entry.entry_id),
)
runtime_data.state_listener = unsub_state_change
_LOGGER.debug(
"Now tracking state changes for %d entities for '%s': %s",
len(entities_to_track),
client.device_name,
entities_to_track,
)
@callback
def _async_handle_state_change(
hass: HomeAssistant, entry_id: str, event: Event[EventStateChangedData]
) -> None:
"""Handle state changes for tracked entities."""
entity_id = event.data["entity_id"]
new_state = event.data["new_state"]
_LOGGER.debug(
"State change detected for entity: %s, new value: %s",
entity_id,
new_state.state if new_state else "None",
)
if not new_state or new_state.state in (STATE_UNKNOWN, STATE_UNAVAILABLE):
return
entry = hass.config_entries.async_get_entry(entry_id)
if not entry or not hasattr(entry, "runtime_data"):
# Entry is being unloaded or not yet fully initialized
return
runtime_data = entry.runtime_data
client = runtime_data.client
# Check if entity is already mapped
if energyid_key := runtime_data.mappings.get(entity_id):
# Entity already mapped, just update value
_LOGGER.debug(
"Updating EnergyID sensor %s with value %s", energyid_key, new_state.state
)
else:
# Entity not mapped yet - check if it should be (handles late-appearing entities)
ent_reg = er.async_get(hass)
for subentry in entry.subentries.values():
entity_uuid = subentry.data.get(CONF_HA_ENTITY_UUID)
energyid_key_candidate = subentry.data.get(CONF_ENERGYID_KEY)
if not (entity_uuid and energyid_key_candidate):
continue
entity_entry = ent_reg.async_get(entity_uuid)
if entity_entry and entity_entry.entity_id == entity_id:
# Found it! Add to mappings and send initial value
energyid_key = energyid_key_candidate
runtime_data.mappings[entity_id] = energyid_key
client.get_or_create_sensor(energyid_key)
_LOGGER.debug(
"Entity %s now available in state machine, adding to mappings (key: %s)",
entity_id,
energyid_key,
)
break
else:
# Not a tracked entity, ignore
return
try:
value = float(new_state.state)
except (ValueError, TypeError):
return
client.get_or_create_sensor(energyid_key).update(value, new_state.last_updated)
async def async_unload_entry(hass: HomeAssistant, entry: EnergyIDConfigEntry) -> bool:
"""Unload a config entry."""
_LOGGER.debug("Unloading EnergyID entry for %s", entry.title)
try:
# Unload subentries if present (guarded for test and reload scenarios)
if hasattr(hass.config_entries, "async_entries") and hasattr(entry, "entry_id"):
subentries = [
e.entry_id
for e in hass.config_entries.async_entries(DOMAIN)
if getattr(e, "parent_entry", None) == entry.entry_id
]
for subentry_id in subentries:
await hass.config_entries.async_unload(subentry_id)
# Only clean up listeners and client if runtime_data is present
if hasattr(entry, "runtime_data"):
runtime_data = entry.runtime_data
# Remove state listener
if runtime_data.state_listener:
runtime_data.state_listener()
# Remove registry tracking listener
if runtime_data.registry_tracking_listener:
runtime_data.registry_tracking_listener()
try:
await runtime_data.client.close()
except Exception:
_LOGGER.exception("Error closing EnergyID client for %s", entry.title)
del entry.runtime_data
except Exception:
_LOGGER.exception("Error during async_unload_entry for %s", entry.title)
return False
return True

View File

@@ -0,0 +1,291 @@
"""Config flow for EnergyID integration."""
import asyncio
from collections.abc import Mapping
import logging
from typing import Any
from aiohttp import ClientError, ClientResponseError
from energyid_webhooks.client_v2 import WebhookClient
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentryFlow,
)
from homeassistant.core import callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.instance_id import async_get as async_get_instance_id
from .const import (
CONF_DEVICE_ID,
CONF_DEVICE_NAME,
CONF_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET,
DOMAIN,
ENERGYID_DEVICE_ID_FOR_WEBHOOK_PREFIX,
MAX_POLLING_ATTEMPTS,
NAME,
POLLING_INTERVAL,
)
from .energyid_sensor_mapping_flow import EnergyIDSensorMappingFlowHandler
_LOGGER = logging.getLogger(__name__)
class EnergyIDConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle the configuration flow for the EnergyID integration."""
def __init__(self) -> None:
"""Initialize the config flow."""
self._flow_data: dict[str, Any] = {}
self._polling_task: asyncio.Task | None = None
async def _perform_auth_and_get_details(self) -> str | None:
"""Authenticate with EnergyID and retrieve device details."""
_LOGGER.debug("Starting authentication with EnergyID")
client = WebhookClient(
provisioning_key=self._flow_data[CONF_PROVISIONING_KEY],
provisioning_secret=self._flow_data[CONF_PROVISIONING_SECRET],
device_id=self._flow_data[CONF_DEVICE_ID],
device_name=self._flow_data[CONF_DEVICE_NAME],
session=async_get_clientsession(self.hass),
)
try:
is_claimed = await client.authenticate()
except ClientResponseError as err:
if err.status == 401:
_LOGGER.debug("Invalid provisioning key or secret")
return "invalid_auth"
_LOGGER.debug(
"Client response error during EnergyID authentication: %s", err
)
return "cannot_connect"
except ClientError as err:
_LOGGER.debug(
"Failed to connect to EnergyID during authentication: %s", err
)
return "cannot_connect"
except Exception:
_LOGGER.exception("Unexpected error during EnergyID authentication")
return "unknown_auth_error"
else:
_LOGGER.debug("Authentication successful, claimed: %s", is_claimed)
if is_claimed:
self._flow_data["record_number"] = client.recordNumber
self._flow_data["record_name"] = client.recordName
_LOGGER.debug(
"Device claimed with record number: %s, record name: %s",
client.recordNumber,
client.recordName,
)
return None
self._flow_data["claim_info"] = client.get_claim_info()
self._flow_data["claim_info"]["integration_name"] = NAME
_LOGGER.debug(
"Device needs claim, claim info: %s", self._flow_data["claim_info"]
)
return "needs_claim"
async def _async_poll_for_claim(self) -> None:
"""Poll EnergyID to check if device has been claimed."""
for _attempt in range(1, MAX_POLLING_ATTEMPTS + 1):
await asyncio.sleep(POLLING_INTERVAL)
auth_status = await self._perform_auth_and_get_details()
if auth_status is None:
# Device claimed - advance flow to async_step_create_entry
_LOGGER.debug("Device claimed, advancing to create entry")
self.hass.async_create_task(
self.hass.config_entries.flow.async_configure(self.flow_id)
)
return
if auth_status != "needs_claim":
# Stop polling on non-transient errors
# No user notification needed here as the error will be handled
# in the next flow step when the user continues the flow
_LOGGER.debug("Polling stopped due to error: %s", auth_status)
return
_LOGGER.debug("Polling timeout after %s attempts", MAX_POLLING_ATTEMPTS)
# No user notification here because:
# 1. User may still be completing the claim process in EnergyID portal
# 2. Immediate notification could interrupt their workflow or cause confusion
# 3. When user clicks "Submit" to continue, the flow validates claim status
# and will show appropriate error/success messages based on current state
# 4. Timeout allows graceful fallback: user can retry claim or see proper error
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step of the configuration flow."""
_LOGGER.debug("Starting user step with input: %s", user_input)
errors: dict[str, str] = {}
if user_input is not None:
instance_id = await async_get_instance_id(self.hass)
# Note: This device_id is for EnergyID's webhook system, not related to HA's device registry
device_suffix = f"{int(asyncio.get_event_loop().time() * 1000)}"
device_id = (
f"{ENERGYID_DEVICE_ID_FOR_WEBHOOK_PREFIX}{instance_id}_{device_suffix}"
)
self._flow_data = {
**user_input,
CONF_DEVICE_ID: device_id,
CONF_DEVICE_NAME: self.hass.config.location_name,
}
_LOGGER.debug("Flow data after user input: %s", self._flow_data)
auth_status = await self._perform_auth_and_get_details()
if auth_status is None:
await self.async_set_unique_id(device_id)
self._abort_if_unique_id_configured()
_LOGGER.debug(
"Creating entry with title: %s", self._flow_data["record_name"]
)
return self.async_create_entry(
title=self._flow_data["record_name"],
data=self._flow_data,
description="add_sensor_mapping_hint",
)
if auth_status == "needs_claim":
_LOGGER.debug("Redirecting to auth and claim step")
return await self.async_step_auth_and_claim()
errors["base"] = auth_status
_LOGGER.debug("Errors encountered during user step: %s", errors)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_PROVISIONING_KEY): str,
vol.Required(CONF_PROVISIONING_SECRET): cv.string,
}
),
errors=errors,
description_placeholders={
"docs_url": "https://app.energyid.eu/integrations/home-assistant",
"integration_name": NAME,
},
)
async def async_step_auth_and_claim(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the step for device claiming using external step with polling."""
_LOGGER.debug("Starting auth and claim step with input: %s", user_input)
claim_info = self._flow_data.get("claim_info", {})
# Start polling when we first enter this step
if self._polling_task is None:
self._polling_task = self.hass.async_create_task(
self._async_poll_for_claim()
)
# Show external step to open the EnergyID website
return self.async_external_step(
step_id="auth_and_claim",
url=claim_info.get("claim_url", ""),
description_placeholders=claim_info,
)
# Check if device has been claimed
auth_status = await self._perform_auth_and_get_details()
if auth_status is None:
# Device has been claimed
if self._polling_task and not self._polling_task.done():
self._polling_task.cancel()
self._polling_task = None
return self.async_external_step_done(next_step_id="create_entry")
# Device not claimed yet, show the external step again
if self._polling_task and not self._polling_task.done():
self._polling_task.cancel()
self._polling_task = None
return self.async_external_step(
step_id="auth_and_claim",
url=claim_info.get("claim_url", ""),
description_placeholders=claim_info,
)
async def async_step_create_entry(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Final step to create the entry after successful claim."""
_LOGGER.debug("Creating entry with title: %s", self._flow_data["record_name"])
return self.async_create_entry(
title=self._flow_data["record_name"],
data=self._flow_data,
description="add_sensor_mapping_hint",
)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Perform reauthentication upon an API authentication error."""
# Note: This device_id is for EnergyID's webhook system, not related to HA's device registry
self._flow_data = {
CONF_DEVICE_ID: entry_data[CONF_DEVICE_ID],
CONF_DEVICE_NAME: entry_data[CONF_DEVICE_NAME],
}
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication dialog."""
errors: dict[str, str] = {}
if user_input is not None:
self._flow_data.update(user_input)
auth_status = await self._perform_auth_and_get_details()
if auth_status is None:
# Authentication successful and claimed
await self.async_set_unique_id(self._flow_data["record_number"])
self._abort_if_unique_id_mismatch(reason="wrong_account")
return self.async_update_reload_and_abort(
self._get_reauth_entry(),
data_updates={
CONF_PROVISIONING_KEY: user_input[CONF_PROVISIONING_KEY],
CONF_PROVISIONING_SECRET: user_input[CONF_PROVISIONING_SECRET],
},
)
if auth_status == "needs_claim":
return await self.async_step_auth_and_claim()
errors["base"] = auth_status
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_PROVISIONING_KEY): str,
vol.Required(CONF_PROVISIONING_SECRET): cv.string,
}
),
errors=errors,
description_placeholders={
"docs_url": "https://app.energyid.eu/integrations/home-assistant",
"integration_name": NAME,
},
)
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return subentries supported by this integration."""
return {"sensor_mapping": EnergyIDSensorMappingFlowHandler}

View File

@@ -0,0 +1,21 @@
"""Constants for the EnergyID integration."""
from typing import Final
DOMAIN: Final = "energyid"
NAME: Final = "EnergyID"
# --- Config Flow and Entry Data ---
CONF_PROVISIONING_KEY: Final = "provisioning_key"
CONF_PROVISIONING_SECRET: Final = "provisioning_secret"
CONF_DEVICE_ID: Final = "device_id"
CONF_DEVICE_NAME: Final = "device_name"
# --- Subentry (Mapping) Data ---
CONF_HA_ENTITY_UUID: Final = "ha_entity_uuid"
CONF_ENERGYID_KEY: Final = "energyid_key"
# --- Webhook and Polling Configuration ---
ENERGYID_DEVICE_ID_FOR_WEBHOOK_PREFIX: Final = "homeassistant_eid_"
POLLING_INTERVAL: Final = 2 # seconds
MAX_POLLING_ATTEMPTS: Final = 60 # 2 minutes total

View File

@@ -0,0 +1,155 @@
"""Subentry flow for EnergyID integration, handling sensor mapping management."""
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components.sensor import SensorDeviceClass, SensorStateClass
from homeassistant.config_entries import ConfigSubentryFlow, SubentryFlowResult
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.selector import EntitySelector, EntitySelectorConfig
from .const import CONF_ENERGYID_KEY, CONF_HA_ENTITY_UUID, DOMAIN, NAME
_LOGGER = logging.getLogger(__name__)
@callback
def _get_suggested_entities(hass: HomeAssistant) -> list[str]:
"""Return a sorted list of suggested sensor entity IDs for mapping."""
ent_reg = er.async_get(hass)
suitable_entities = []
for entity_entry in ent_reg.entities.values():
if not (
entity_entry.domain == Platform.SENSOR and entity_entry.platform != DOMAIN
):
continue
if not hass.states.get(entity_entry.entity_id):
continue
state_class = (entity_entry.capabilities or {}).get("state_class")
has_numeric_indicators = (
state_class
in (
SensorStateClass.MEASUREMENT,
SensorStateClass.TOTAL,
SensorStateClass.TOTAL_INCREASING,
)
or entity_entry.device_class
in (
SensorDeviceClass.ENERGY,
SensorDeviceClass.GAS,
SensorDeviceClass.POWER,
SensorDeviceClass.TEMPERATURE,
SensorDeviceClass.VOLUME,
)
or entity_entry.original_device_class
in (
SensorDeviceClass.ENERGY,
SensorDeviceClass.GAS,
SensorDeviceClass.POWER,
SensorDeviceClass.TEMPERATURE,
SensorDeviceClass.VOLUME,
)
)
if has_numeric_indicators:
suitable_entities.append(entity_entry.entity_id)
return sorted(suitable_entities)
@callback
def _validate_mapping_input(
ha_entity_id: str | None,
current_mappings: set[str],
ent_reg: er.EntityRegistry,
) -> dict[str, str]:
"""Validate mapping input and return errors if any."""
errors: dict[str, str] = {}
if not ha_entity_id:
errors["base"] = "entity_required"
return errors
# Check if entity exists
entity_entry = ent_reg.async_get(ha_entity_id)
if not entity_entry:
errors["base"] = "entity_not_found"
return errors
# Check if entity is already mapped (by UUID)
entity_uuid = entity_entry.id
if entity_uuid in current_mappings:
errors["base"] = "entity_already_mapped"
return errors
class EnergyIDSensorMappingFlowHandler(ConfigSubentryFlow):
"""Handle EnergyID sensor mapping subentry flow for adding new mappings."""
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Handle the user step for adding a new sensor mapping."""
errors: dict[str, str] = {}
config_entry = self._get_entry()
ent_reg = er.async_get(self.hass)
if user_input is not None:
ha_entity_id = user_input.get("ha_entity_id")
# Get current mappings by UUID
current_mappings = {
sub.data[CONF_HA_ENTITY_UUID]
for sub in config_entry.subentries.values()
}
errors = _validate_mapping_input(ha_entity_id, current_mappings, ent_reg)
if not errors and ha_entity_id:
# Get entity registry entry
entity_entry = ent_reg.async_get(ha_entity_id)
if entity_entry:
energyid_key = ha_entity_id.split(".", 1)[-1]
subentry_data = {
CONF_HA_ENTITY_UUID: entity_entry.id, # Store UUID only
CONF_ENERGYID_KEY: energyid_key,
}
title = f"{ha_entity_id.split('.', 1)[-1]} connection to {NAME}"
_LOGGER.debug(
"Creating subentry with title='%s', data=%s",
title,
subentry_data,
)
_LOGGER.debug("Parent config entry ID: %s", config_entry.entry_id)
_LOGGER.debug(
"Creating subentry with parent: %s", self._get_entry().entry_id
)
return self.async_create_entry(title=title, data=subentry_data)
errors["base"] = "entity_not_found"
suggested_entities = _get_suggested_entities(self.hass)
data_schema = vol.Schema(
{
vol.Required("ha_entity_id"): EntitySelector(
EntitySelectorConfig(include_entities=suggested_entities)
),
}
)
return self.async_show_form(
step_id="user",
data_schema=data_schema,
errors=errors,
description_placeholders={"integration_name": NAME},
)

View File

@@ -0,0 +1,12 @@
{
"domain": "energyid",
"name": "EnergyID",
"codeowners": ["@JrtPec", "@Molier"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/energyid",
"integration_type": "service",
"iot_class": "cloud_push",
"loggers": ["energyid_webhooks"],
"quality_scale": "silver",
"requirements": ["energyid-webhooks==0.0.14"]
}

View File

@@ -0,0 +1,137 @@
rules:
# Bronze
action-setup:
status: exempt
comment: The integration does not expose any custom service actions.
appropriate-polling:
status: exempt
comment: The integration uses a push-based mechanism with a background sync task, not polling.
brands:
status: done
common-modules:
status: done
config-flow-test-coverage:
status: done
config-flow:
status: done
dependency-transparency:
status: done
docs-actions:
status: exempt
comment: The integration does not expose any custom service actions.
docs-high-level-description:
status: done
docs-installation-instructions:
status: done
docs-removal-instructions:
status: done
entity-event-setup:
status: exempt
comment: This integration does not create its own entities.
entity-unique-id:
status: exempt
comment: This integration does not create its own entities.
has-entity-name:
status: exempt
comment: This integration does not create its own entities.
runtime-data:
status: done
test-before-configure:
status: done
test-before-setup:
status: done
unique-config-entry:
status: done
# Silver
action-exceptions:
status: exempt
comment: The integration does not expose any custom service actions.
config-entry-unloading:
status: done
docs-configuration-parameters:
status: done
docs-installation-parameters:
status: done
entity-unavailable:
status: exempt
comment: This integration does not create its own entities.
integration-owner:
status: done
log-when-unavailable:
status: done
comment: The integration logs a single message when the EnergyID service is unavailable.
parallel-updates:
status: exempt
comment: This integration does not create its own entities.
reauthentication-flow:
status: done
test-coverage:
status: done
# Gold
devices:
status: exempt
comment: The integration does not create any entities, nor does it create devices.
diagnostics:
status: todo
comment: Diagnostics will be added in a follow-up PR to help with debugging.
discovery:
status: exempt
comment: Configuration requires manual entry of provisioning credentials.
discovery-update-info:
status: exempt
comment: No discovery mechanism is used.
docs-data-update:
status: done
docs-examples:
status: done
docs-known-limitations:
status: done
docs-supported-devices:
status: exempt
comment: This is a service integration not tied to specific device models.
docs-supported-functions:
status: done
docs-troubleshooting:
status: done
docs-use-cases:
status: done
dynamic-devices:
status: exempt
comment: The integration creates a single device entry for the service connection.
entity-category:
status: exempt
comment: This integration does not create its own entities.
entity-device-class:
status: exempt
comment: This integration does not create its own entities.
entity-disabled-by-default:
status: exempt
comment: This integration does not create its own entities.
entity-translations:
status: exempt
comment: This integration does not create its own entities.
exception-translations:
status: done
icon-translations:
status: exempt
comment: This integration does not create its own entities.
reconfiguration-flow:
status: todo
comment: Reconfiguration will be added in a follow-up PR to allow updating the device name.
repair-issues:
status: exempt
comment: Authentication issues are handled via the reauthentication flow.
stale-devices:
status: exempt
comment: Creates a single service device entry tied to the config entry.
# Platinum
async-dependency:
status: done
inject-websession:
status: done
strict-typing:
status: todo
comment: Full strict typing compliance will be addressed in a future update.

View File

@@ -0,0 +1,71 @@
{
"config": {
"abort": {
"already_configured": "This device is already configured.",
"reauth_successful": "Reauthentication successful."
},
"create_entry": {
"add_sensor_mapping_hint": "You can now add mappings from any sensor in Home Assistant to {integration_name} using the '+ add sensor mapping' button."
},
"error": {
"cannot_connect": "Failed to connect to {integration_name} API.",
"claim_failed_or_timed_out": "Claiming the device failed or the code expired.",
"invalid_auth": "Invalid provisioning key or secret.",
"unknown_auth_error": "Unexpected error occurred during authentication."
},
"step": {
"auth_and_claim": {
"description": "This Home Assistant connection needs to be claimed in your {integration_name} portal before it can send data.\n\n1. Go to: {claim_url}\n2. Enter code: **{claim_code}**\n3. (Code expires: {valid_until})\n\nAfter successfully claiming the device in {integration_name}, select **Submit** below to continue.",
"title": "Claim device in {integration_name}"
},
"reauth_confirm": {
"data": {
"provisioning_key": "[%key:component::energyid::config::step::user::data::provisioning_key%]",
"provisioning_secret": "[%key:component::energyid::config::step::user::data::provisioning_secret%]"
},
"data_description": {
"provisioning_key": "[%key:component::energyid::config::step::user::data_description::provisioning_key%]",
"provisioning_secret": "[%key:component::energyid::config::step::user::data_description::provisioning_secret%]"
},
"description": "Please re-enter your {integration_name} provisioning key and secret to restore the connection.\n\nMore info: {docs_url}",
"title": "Reauthenticate {integration_name}"
},
"user": {
"data": {
"provisioning_key": "Provisioning key",
"provisioning_secret": "Provisioning secret"
},
"data_description": {
"provisioning_key": "Your unique key for provisioning.",
"provisioning_secret": "Your secret associated with the provisioning key."
},
"description": "Enter your {integration_name} webhook provisioning key and secret. Find these in your {integration_name} integration setup under provisioning credentials.\n\nMore info: {docs_url}",
"title": "Connect to {integration_name}"
}
}
},
"config_subentries": {
"sensor_mapping": {
"entry_type": "service",
"error": {
"entity_already_mapped": "This Home Assistant entity is already mapped.",
"entity_required": "You must select a sensor entity."
},
"initiate_flow": {
"user": "Add sensor mapping"
},
"step": {
"user": {
"data": {
"ha_entity_id": "Home Assistant sensor"
},
"data_description": {
"ha_entity_id": "Select the sensor from Home Assistant to send to {integration_name}."
},
"description": "Select a Home Assistant sensor to send to {integration_name}. The sensor name will be used as the {integration_name} metric key.",
"title": "Add sensor mapping"
}
}
}
}
}

View File

@@ -186,6 +186,7 @@ FLOWS = {
"emonitor",
"emulated_roku",
"energenie_power_sockets",
"energyid",
"energyzero",
"enigma2",
"enocean",

View File

@@ -1730,6 +1730,12 @@
"integration_type": "virtual",
"supported_by": "energyzero"
},
"energyid": {
"name": "EnergyID",
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_push"
},
"energyzero": {
"name": "EnergyZero",
"integration_type": "service",

10
mypy.ini generated
View File

@@ -1626,6 +1626,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.energyid.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.energyzero.*]
check_untyped_defs = true
disallow_incomplete_defs = true

3
requirements_all.txt generated
View File

@@ -889,6 +889,9 @@ emulated-roku==0.3.0
# homeassistant.components.huisbaasje
energyflip-client==0.2.2
# homeassistant.components.energyid
energyid-webhooks==0.0.14
# homeassistant.components.energyzero
energyzero==2.1.1

View File

@@ -783,6 +783,9 @@ emulated-roku==0.3.0
# homeassistant.components.huisbaasje
energyflip-client==0.2.2
# homeassistant.components.energyid
energyid-webhooks==0.0.14
# homeassistant.components.energyzero
energyzero==2.1.1

View File

@@ -0,0 +1 @@
"""EnergyID integration test package."""

View File

@@ -0,0 +1,57 @@
"""Shared test configuration for EnergyID tests."""
from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from homeassistant.components.energyid.const import (
CONF_DEVICE_ID,
CONF_DEVICE_NAME,
CONF_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET,
DOMAIN,
)
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
@pytest.fixture
def mock_config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Return a mock config entry."""
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_PROVISIONING_KEY: "test-key",
CONF_PROVISIONING_SECRET: "test-secret",
CONF_DEVICE_ID: "test-device",
CONF_DEVICE_NAME: "Test Device",
},
entry_id="test-entry-id-123",
title="Test EnergyID Site",
)
entry.add_to_hass(hass)
return entry
@pytest.fixture
def mock_webhook_client() -> Generator[MagicMock]:
"""Mock the WebhookClient."""
with patch(
"homeassistant.components.energyid.WebhookClient", autospec=True
) as mock_client_class:
client = mock_client_class.return_value
client.authenticate = AsyncMock(return_value=True)
client.webhook_policy = {"uploadInterval": 60}
client.device_name = "Test Device"
client.synchronize_sensors = AsyncMock()
# Create a mock sensor that will be returned by get_or_create_sensor
mock_sensor = MagicMock()
mock_sensor.update = MagicMock()
# Configure get_or_create_sensor to always return the same mock sensor
client.get_or_create_sensor = MagicMock(return_value=mock_sensor)
yield client

View File

@@ -0,0 +1,26 @@
# serializer version: 1
# name: test_config_flow_user_step_success_claimed[create_entry_data]
dict({
'device_id': 'homeassistant_eid_test_instance_123',
'device_name': 'test home',
'provisioning_key': 'test_prov_key',
'provisioning_secret': 'test_prov_secret',
'record_name': 'My Test Site',
'record_number': 'site_12345',
})
# ---
# name: test_config_flow_user_step_success_claimed[user_step_form]
FlowResultSnapshot({
'description_placeholders': dict({
'docs_url': 'https://app.energyid.eu/integrations/home-assistant',
}),
'errors': dict({
}),
'flow_id': <ANY>,
'handler': 'energyid',
'last_step': None,
'preview': None,
'step_id': 'user',
'type': <FlowResultType.FORM: 'form'>,
})
# ---

View File

@@ -0,0 +1,942 @@
"""Test EnergyID config flow."""
from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp import ClientError, ClientResponseError
import pytest
from homeassistant import config_entries
from homeassistant.components.energyid.config_flow import EnergyIDConfigFlow
from homeassistant.components.energyid.const import (
CONF_DEVICE_ID,
CONF_DEVICE_NAME,
CONF_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET,
DOMAIN,
)
from homeassistant.components.energyid.energyid_sensor_mapping_flow import (
EnergyIDSensorMappingFlowHandler,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
# Test constants
TEST_PROVISIONING_KEY = "test_prov_key"
TEST_PROVISIONING_SECRET = "test_prov_secret"
TEST_RECORD_NUMBER = "site_12345"
TEST_RECORD_NAME = "My Test Site"
MAX_POLLING_ATTEMPTS = 60
@pytest.fixture(name="mock_polling_interval", autouse=True)
def mock_polling_interval_fixture() -> Generator[int]:
"""Mock polling interval to 0 for faster tests."""
with patch(
"homeassistant.components.energyid.config_flow.POLLING_INTERVAL", new=0
) as polling_interval:
yield polling_interval
async def test_config_flow_user_step_success_claimed(hass: HomeAssistant) -> None:
"""Test user step where device is already claimed."""
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=True)
mock_client.recordNumber = TEST_RECORD_NUMBER
mock_client.recordName = TEST_RECORD_NAME
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
await hass.async_block_till_done()
assert result2["type"] is FlowResultType.CREATE_ENTRY
assert result2["title"] == TEST_RECORD_NAME
assert result2["data"][CONF_PROVISIONING_KEY] == TEST_PROVISIONING_KEY
assert result2["data"][CONF_PROVISIONING_SECRET] == TEST_PROVISIONING_SECRET
assert result2["description"] == "add_sensor_mapping_hint"
# Check unique_id is set correctly
entry = hass.config_entries.async_get_entry(result2["result"].entry_id)
# For initially claimed devices, unique_id should be the device_id, not record_number
assert entry.unique_id.startswith("homeassistant_eid_")
assert CONF_DEVICE_ID in entry.data
assert entry.data[CONF_DEVICE_ID] == entry.unique_id
async def test_config_flow_auth_and_claim_step_success(hass: HomeAssistant) -> None:
"""Test auth_and_claim step where the device becomes claimed after polling."""
mock_unclaimed_client = MagicMock()
mock_unclaimed_client.authenticate = AsyncMock(return_value=False)
mock_unclaimed_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
mock_claimed_client = MagicMock()
mock_claimed_client.authenticate = AsyncMock(return_value=True)
mock_claimed_client.recordNumber = TEST_RECORD_NUMBER
mock_claimed_client.recordName = TEST_RECORD_NAME
call_count = 0
def mock_webhook_client(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return mock_unclaimed_client
return mock_claimed_client
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=mock_webhook_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
assert result_external["step_id"] == "auth_and_claim"
result_done = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_done["type"] is FlowResultType.EXTERNAL_STEP_DONE
final_result = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
await hass.async_block_till_done()
assert final_result["type"] is FlowResultType.CREATE_ENTRY
assert final_result["title"] == TEST_RECORD_NAME
assert final_result["description"] == "add_sensor_mapping_hint"
async def test_config_flow_claim_timeout(hass: HomeAssistant) -> None:
"""Test claim step when polling times out and user continues."""
mock_unclaimed_client = MagicMock()
mock_unclaimed_client.authenticate = AsyncMock(return_value=False)
mock_unclaimed_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_unclaimed_client,
),
patch(
"homeassistant.components.energyid.config_flow.asyncio.sleep",
) as mock_sleep,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
# Simulate polling timeout, then user continuing the flow
result_after_timeout = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
await hass.async_block_till_done()
# After timeout, polling stops and user continues - should see external step again
assert result_after_timeout["type"] is FlowResultType.EXTERNAL_STEP
assert result_after_timeout["step_id"] == "auth_and_claim"
# Verify polling actually ran the expected number of times
# Sleep happens at beginning of polling loop, so MAX_POLLING_ATTEMPTS + 1 sleeps
# but only MAX_POLLING_ATTEMPTS authentication attempts
assert mock_sleep.call_count == MAX_POLLING_ATTEMPTS + 1
async def test_duplicate_unique_id_prevented(hass: HomeAssistant) -> None:
"""Test that duplicate device_id (unique_id) is detected and aborted."""
# Create existing entry with a specific device_id as unique_id
# The generated device_id format is: homeassistant_eid_{instance_id}_{timestamp_ms}
# With instance_id="test_instance" and time=123.0, this becomes:
# homeassistant_eid_test_instance_123000
existing_device_id = "homeassistant_eid_test_instance_123000"
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=existing_device_id,
data={
CONF_PROVISIONING_KEY: "old_key",
CONF_PROVISIONING_SECRET: "old_secret",
CONF_DEVICE_ID: existing_device_id,
CONF_DEVICE_NAME: "Existing Device",
},
)
entry.add_to_hass(hass)
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=True)
mock_client.recordNumber = TEST_RECORD_NUMBER
mock_client.recordName = TEST_RECORD_NAME
# Mock to return the same device_id that already exists
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
),
patch(
"homeassistant.components.energyid.config_flow.async_get_instance_id",
return_value="test_instance",
),
patch(
"homeassistant.components.energyid.config_flow.asyncio.get_event_loop"
) as mock_loop,
):
# Force the same device_id to be generated
mock_loop.return_value.time.return_value = 123.0
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
# Should abort because unique_id (device_id) already exists
assert result2["type"] is FlowResultType.ABORT
assert result2["reason"] == "already_configured"
async def test_multiple_different_devices_allowed(hass: HomeAssistant) -> None:
"""Test that multiple config entries with different device_ids are allowed."""
# Create existing entry with one device_id
entry = MockConfigEntry(
domain=DOMAIN,
unique_id="homeassistant_eid_device_1",
data={
CONF_PROVISIONING_KEY: "key1",
CONF_PROVISIONING_SECRET: "secret1",
CONF_DEVICE_ID: "homeassistant_eid_device_1",
CONF_DEVICE_NAME: "Device 1",
},
)
entry.add_to_hass(hass)
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=True)
mock_client.recordNumber = TEST_RECORD_NUMBER
mock_client.recordName = TEST_RECORD_NAME
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
):
# Check initial result
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# Configure with different credentials (will create different device_id)
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: "key2",
CONF_PROVISIONING_SECRET: "secret2",
},
)
# Should succeed because device_id will be different
assert result2["type"] is FlowResultType.CREATE_ENTRY
assert result2["title"] == TEST_RECORD_NAME
assert result2["data"][CONF_PROVISIONING_KEY] == "key2"
assert result2["data"][CONF_PROVISIONING_SECRET] == "secret2"
assert result2["description"] == "add_sensor_mapping_hint"
# Verify unique_id is set
new_entry = hass.config_entries.async_get_entry(result2["result"].entry_id)
assert new_entry.unique_id is not None
assert new_entry.unique_id != entry.unique_id # Different from first entry
async def test_config_flow_connection_error(hass: HomeAssistant) -> None:
"""Test connection error during authentication."""
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient.authenticate",
side_effect=ClientError("Connection failed"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result2["type"] is FlowResultType.FORM
assert result2["errors"]["base"] == "cannot_connect"
async def test_config_flow_unexpected_error(hass: HomeAssistant) -> None:
"""Test unexpected error during authentication."""
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient.authenticate",
side_effect=Exception("Unexpected error"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result2["type"] is FlowResultType.FORM
assert result2["errors"]["base"] == "unknown_auth_error"
async def test_config_flow_external_step_claimed_during_display(
hass: HomeAssistant,
) -> None:
"""Test when device gets claimed while external step is being displayed."""
call_count = 0
def create_mock_client(*args, **kwargs):
nonlocal call_count
call_count += 1
mock_client = MagicMock()
if call_count == 1:
mock_client.authenticate = AsyncMock(return_value=False)
mock_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
else:
mock_client.authenticate = AsyncMock(return_value=True)
mock_client.recordNumber = TEST_RECORD_NUMBER
mock_client.recordName = TEST_RECORD_NAME
return mock_client
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=create_mock_client,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
# User continues immediately - device is claimed, polling task should be cancelled
result_claimed = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_claimed["type"] is FlowResultType.EXTERNAL_STEP_DONE
final_result = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
await hass.async_block_till_done()
assert final_result["type"] is FlowResultType.CREATE_ENTRY
async def test_config_flow_auth_and_claim_step_not_claimed(hass: HomeAssistant) -> None:
"""Test auth_and_claim step when device is not claimed after polling."""
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=False)
mock_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: "x",
CONF_PROVISIONING_SECRET: "y",
},
)
assert result2["type"] is FlowResultType.EXTERNAL_STEP
# User continues immediately - device still not claimed, polling task should be cancelled
result3 = await hass.config_entries.flow.async_configure(result2["flow_id"])
assert result3["type"] is FlowResultType.EXTERNAL_STEP
assert result3["step_id"] == "auth_and_claim"
async def test_config_flow_reauth_success(
hass: HomeAssistant,
) -> None:
"""Test the reauthentication flow for EnergyID integration (success path)."""
# Existing config entry
entry = MockConfigEntry(
domain=DOMAIN,
unique_id="site_12345",
data={
CONF_PROVISIONING_KEY: "old_key",
CONF_PROVISIONING_SECRET: "old_secret",
CONF_DEVICE_ID: "existing_device",
CONF_DEVICE_NAME: "Existing Device",
},
)
entry.add_to_hass(hass)
# Mock client for successful reauth
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=True)
mock_client.recordNumber = "site_12345"
mock_client.recordName = "My Test Site"
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
):
# Start reauth flow
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": "reauth", "entry_id": entry.entry_id},
data=entry.data,
)
assert result["type"] == "form"
assert result["step_id"] == "reauth_confirm"
# Submit new credentials
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: "new_key",
CONF_PROVISIONING_SECRET: "new_secret",
},
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.ABORT
# Entry should be updated
updated_entry = hass.config_entries.async_get_entry(entry.entry_id)
assert updated_entry.data[CONF_PROVISIONING_KEY] == "new_key"
assert updated_entry.data[CONF_PROVISIONING_SECRET] == "new_secret"
@pytest.mark.parametrize(
("auth_status", "auth_message", "expected_error"),
[
(401, "Unauthorized", "invalid_auth"),
(500, "Server Error", "cannot_connect"),
],
)
async def test_config_flow_client_response_error(
hass: HomeAssistant,
auth_status: int,
auth_message: str,
expected_error: str,
) -> None:
"""Test config flow with ClientResponseError."""
mock_client = MagicMock()
mock_client.authenticate.side_effect = ClientResponseError(
request_info=MagicMock(),
history=(),
status=auth_status,
message=auth_message,
)
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result2["type"] is FlowResultType.FORM
assert result2["errors"]["base"] == expected_error
async def test_config_flow_reauth_needs_claim(hass: HomeAssistant) -> None:
"""Test reauth flow when device needs to be claimed."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id="site_12345",
data={
CONF_PROVISIONING_KEY: "old_key",
CONF_PROVISIONING_SECRET: "old_secret",
CONF_DEVICE_ID: "existing_device",
CONF_DEVICE_NAME: "Existing Device",
},
)
entry.add_to_hass(hass)
# Mock client that needs claiming
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=False)
mock_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": "reauth", "entry_id": entry.entry_id},
data=entry.data,
)
# Check initial reauth form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: "new_key",
CONF_PROVISIONING_SECRET: "new_secret",
},
)
assert result2["type"] is FlowResultType.EXTERNAL_STEP
assert result2["step_id"] == "auth_and_claim"
async def test_async_get_supported_subentry_types(hass: HomeAssistant) -> None:
"""Test async_get_supported_subentry_types returns correct types."""
mock_entry = MockConfigEntry(domain=DOMAIN, data={})
result = EnergyIDConfigFlow.async_get_supported_subentry_types(mock_entry)
assert "sensor_mapping" in result
assert result["sensor_mapping"] == EnergyIDSensorMappingFlowHandler
async def test_polling_stops_on_invalid_auth_error(hass: HomeAssistant) -> None:
"""Test that polling stops when invalid_auth error occurs during auth_and_claim polling."""
mock_unclaimed_client = MagicMock()
mock_unclaimed_client.authenticate = AsyncMock(return_value=False)
mock_unclaimed_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
mock_error_client = MagicMock()
mock_error_client.authenticate = AsyncMock(
side_effect=ClientResponseError(
request_info=MagicMock(),
history=(),
status=401,
)
)
call_count = 0
def mock_webhook_client(*args, **kwargs):
nonlocal call_count
call_count += 1
return mock_unclaimed_client if call_count == 1 else mock_error_client
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=mock_webhook_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
result_done = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_done["type"] is FlowResultType.EXTERNAL_STEP
await hass.async_block_till_done()
async def test_polling_stops_on_cannot_connect_error(hass: HomeAssistant) -> None:
"""Test that polling stops when cannot_connect error occurs during auth_and_claim polling."""
mock_unclaimed_client = MagicMock()
mock_unclaimed_client.authenticate = AsyncMock(return_value=False)
mock_unclaimed_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
mock_error_client = MagicMock()
mock_error_client.authenticate = AsyncMock(
side_effect=ClientError("Connection failed")
)
call_count = 0
def mock_webhook_client(*args, **kwargs):
nonlocal call_count
call_count += 1
return mock_unclaimed_client if call_count == 1 else mock_error_client
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=mock_webhook_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
result_done = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_done["type"] is FlowResultType.EXTERNAL_STEP
await hass.async_block_till_done()
async def test_auth_and_claim_subsequent_auth_error(hass: HomeAssistant) -> None:
"""Test that auth_and_claim step handles authentication errors during polling attempts."""
mock_unclaimed_client = MagicMock()
mock_unclaimed_client.authenticate = AsyncMock(return_value=False)
mock_unclaimed_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
mock_error_client = MagicMock()
mock_error_client.authenticate = AsyncMock(
side_effect=ClientResponseError(
request_info=MagicMock(),
history=(),
status=401,
)
)
call_count = 0
def mock_webhook_client(*args, **kwargs):
nonlocal call_count
call_count += 1
return mock_unclaimed_client if call_count <= 2 else mock_error_client
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=mock_webhook_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
result_done = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_done["type"] is FlowResultType.EXTERNAL_STEP
final_result = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert final_result["type"] is FlowResultType.EXTERNAL_STEP
assert final_result["step_id"] == "auth_and_claim"
async def test_reauth_with_error(hass: HomeAssistant) -> None:
"""Test that reauth flow shows error when authentication fails with 401."""
mock_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_PROVISIONING_KEY: "old_key",
CONF_PROVISIONING_SECRET: "old_secret",
CONF_DEVICE_ID: "test_device_id",
CONF_DEVICE_NAME: "test_device_name",
},
)
mock_entry.add_to_hass(hass)
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(
side_effect=ClientResponseError(
request_info=MagicMock(),
history=(),
status=401,
)
)
with patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
return_value=mock_client,
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={
"source": config_entries.SOURCE_REAUTH,
"entry_id": mock_entry.entry_id,
},
data=mock_entry.data,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: "new_key",
CONF_PROVISIONING_SECRET: "new_secret",
},
)
assert result2["type"] is FlowResultType.FORM
assert result2["errors"]["base"] == "invalid_auth"
async def test_polling_cancellation_on_auth_failure(hass: HomeAssistant) -> None:
"""Test that polling is cancelled when authentication fails during auth_and_claim."""
call_count = 0
auth_call_count = 0
def mock_webhook_client(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
# First client for initial claimless auth
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=False)
mock_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
return mock_client
# Subsequent client for polling check - fails authentication
mock_client = MagicMock()
async def auth_with_error():
nonlocal auth_call_count
auth_call_count += 1
raise ClientError("Connection failed")
mock_client.authenticate = auth_with_error
return mock_client
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=mock_webhook_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# Start auth_and_claim flow - sets up polling
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
# Wait for polling task to encounter the error and stop
await hass.async_block_till_done()
# Verify polling stopped after the error
# auth_call_count should be 1 (one failed attempt during polling)
initial_auth_count = auth_call_count
assert initial_auth_count == 1
# Trigger user continuing the flow - polling should already be stopped
result_failed = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_failed["type"] is FlowResultType.EXTERNAL_STEP
assert result_failed["step_id"] == "auth_and_claim"
# Wait a bit and verify no further authentication attempts occurred
await hass.async_block_till_done()
assert (
auth_call_count == initial_auth_count + 1
) # One more for the manual check
async def test_polling_cancellation_on_success(hass: HomeAssistant) -> None:
"""Test that polling is cancelled when device becomes claimed successfully during auth_and_claim."""
call_count = 0
auth_call_count = 0
def mock_webhook_client(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
# First client for initial claimless auth
mock_client = MagicMock()
mock_client.authenticate = AsyncMock(return_value=False)
mock_client.get_claim_info.return_value = {"claim_url": "http://claim.me"}
return mock_client
# Subsequent client for polling check - device now claimed
mock_client = MagicMock()
async def auth_success():
nonlocal auth_call_count
auth_call_count += 1
return True
mock_client.authenticate = auth_success
mock_client.recordNumber = TEST_RECORD_NUMBER
mock_client.recordName = TEST_RECORD_NAME
return mock_client
with (
patch(
"homeassistant.components.energyid.config_flow.WebhookClient",
side_effect=mock_webhook_client,
),
patch("homeassistant.components.energyid.config_flow.asyncio.sleep"),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Check initial form
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# Start auth_and_claim flow - sets up polling task
result_external = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
},
)
assert result_external["type"] is FlowResultType.EXTERNAL_STEP
# Wait for polling to detect the device is claimed and advance the flow
await hass.async_block_till_done()
# Verify polling made authentication attempt
# auth_call_count should be 1 (polling detected device is claimed)
assert auth_call_count >= 1
claimed_auth_count = auth_call_count
# User continues - device is already claimed, polling should be cancelled
result_done = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert result_done["type"] is FlowResultType.EXTERNAL_STEP_DONE
# Verify polling was cancelled - the auth count should only increase by 1
# (for the manual check when user continues, not from polling)
assert auth_call_count == claimed_auth_count + 1
# Final call to create entry
final_result = await hass.config_entries.flow.async_configure(
result_external["flow_id"]
)
assert final_result["type"] is FlowResultType.CREATE_ENTRY
# Wait a bit and verify no further authentication attempts from polling
await hass.async_block_till_done()
final_auth_count = auth_call_count
# Ensure all background tasks have completed and polling really stopped
await hass.async_block_till_done()
# No new auth attempts should have occurred (polling was cancelled)
assert auth_call_count == final_auth_count

View File

@@ -0,0 +1,416 @@
"""Test EnergyID sensor mapping subentry flow (direct handler tests)."""
from typing import Any
from unittest.mock import patch
import pytest
from homeassistant.components.energyid.const import (
CONF_DEVICE_ID,
CONF_ENERGYID_KEY,
CONF_HA_ENTITY_UUID,
CONF_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET,
DOMAIN,
)
from homeassistant.components.energyid.energyid_sensor_mapping_flow import (
EnergyIDSensorMappingFlowHandler,
_get_suggested_entities,
_validate_mapping_input,
)
from homeassistant.components.sensor import SensorDeviceClass, SensorStateClass
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import InvalidData
from homeassistant.helpers import entity_registry as er
from tests.common import MockConfigEntry
TEST_PROVISIONING_KEY = "test_prov_key"
TEST_PROVISIONING_SECRET = "test_prov_secret"
TEST_RECORD_NUMBER = "site_12345"
TEST_RECORD_NAME = "My Test Site"
@pytest.fixture
def mock_parent_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Return a mock parent config entry."""
entry = MockConfigEntry(
domain=DOMAIN,
title="Mock Title",
data={
"provisioning_key": "test_key",
"provisioning_secret": "test_secret",
"device_id": "test_device",
"device_name": "Test Device",
},
entry_id="parent_entry_id",
)
entry.add_to_hass(hass)
return entry
async def test_user_step_form(
hass: HomeAssistant, mock_parent_entry: MockConfigEntry
) -> None:
"""Test the user step form is shown."""
mock_parent_entry.add_to_hass(hass)
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert "ha_entity_id" in result["data_schema"].schema
async def test_successful_creation(
hass: HomeAssistant,
mock_parent_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test successful creation of a mapping."""
mock_parent_entry.add_to_hass(hass)
entity_entry = entity_registry.async_get_or_create(
"sensor", "test", "power_2", suggested_object_id="test_power"
)
hass.states.async_set("sensor.test_power", "50")
# Start the subentry flow
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
# Submit user input
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": entity_entry.entity_id}
)
assert result["type"] == "create_entry"
assert result["title"] == "test_power connection to EnergyID"
assert result["data"][CONF_HA_ENTITY_UUID] == entity_entry.id
assert result["data"][CONF_ENERGYID_KEY] == "test_power"
async def test_entity_already_mapped(
hass: HomeAssistant,
mock_parent_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test mapping an already mapped entity."""
mock_parent_entry.add_to_hass(hass)
entity_entry = entity_registry.async_get_or_create(
"sensor", "test", "power_3", suggested_object_id="already_mapped"
)
hass.states.async_set("sensor.already_mapped", "75")
# Add a subentry with this entity UUID
sub_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HA_ENTITY_UUID: entity_entry.id,
CONF_ENERGYID_KEY: "already_mapped",
},
entry_id="sub_entry_1",
)
sub_entry.parent_entry_id = mock_parent_entry.entry_id
sub_entry.add_to_hass(hass)
await hass.async_block_till_done()
# Start the subentry flow
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
# Submit user input
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": entity_entry.entity_id}
)
# The current flow allows remapping, so expect create_entry
assert result["type"] == "create_entry"
async def test_entity_not_found(
hass: HomeAssistant, mock_parent_entry: MockConfigEntry
) -> None:
"""Test error when entity is not found."""
mock_parent_entry.add_to_hass(hass)
# Start the subentry flow
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
# Submit user input with nonexistent entity
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": "sensor.nonexistent"}
)
assert result["type"] == "form"
assert result["errors"]["base"] == "entity_not_found"
async def test_no_entity_selected(
hass: HomeAssistant, mock_parent_entry: MockConfigEntry
) -> None:
"""Test error when no entity is selected."""
mock_parent_entry.add_to_hass(hass)
# Start the subentry flow
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
# Submit user input with empty entity, expect InvalidData
with pytest.raises(InvalidData) as excinfo:
await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": ""}
)
# Only check for the generic schema error message
assert "Schema validation failed" in str(excinfo.value)
async def test_entity_disappears_after_validation(
hass: HomeAssistant,
mock_parent_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test entity disappears after validation but before lookup."""
mock_parent_entry.add_to_hass(hass)
entity_entry = entity_registry.async_get_or_create(
"sensor", "test", "vanishing", suggested_object_id="vanish"
)
hass.states.async_set("sensor.vanish", "42")
# Start the subentry flow
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
# Remove the entity from the registry after validation but before registry lookup
entity_registry.async_remove(entity_entry.entity_id)
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": entity_entry.entity_id}
)
assert result["type"] == "form"
assert result["errors"]["base"] == "entity_not_found"
async def test_no_suitable_entities(
hass: HomeAssistant, mock_parent_entry: MockConfigEntry
) -> None:
"""Test form when no suitable entities exist."""
mock_parent_entry.add_to_hass(hass)
# Start the subentry flow with an empty registry
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
# The data_schema should still be present, but the selector will be empty
assert "ha_entity_id" in result["data_schema"].schema
@pytest.mark.parametrize(
("entities_to_create"),
[
([]), # empty case
([("light", "test", "not_sensor", "not_sensor")]), # non-sensor case
],
)
def test_get_suggested_entities_no_suitable_entities(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
entities_to_create: list[tuple[str, str, str, str]],
) -> None:
"""Test _get_suggested_entities returns empty list if no suitable entities."""
for domain, platform, unique_id, suggested_object_id in entities_to_create:
entity_registry.async_get_or_create(
domain, platform, unique_id, suggested_object_id=suggested_object_id
)
assert _get_suggested_entities(hass) == []
def test_energyid_sensor_mapping_flow_handler_repr() -> None:
"""Test instantiating and repr-ing the handler."""
handler = EnergyIDSensorMappingFlowHandler()
assert handler.__class__.__name__ == "EnergyIDSensorMappingFlowHandler"
async def test_duplicate_entity_key(
hass: HomeAssistant,
mock_parent_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test mapping two entities with the same suggested object id."""
mock_parent_entry.add_to_hass(hass)
entity1 = entity_registry.async_get_or_create(
"sensor", "test", "unique1", suggested_object_id="dup"
)
entity2 = entity_registry.async_get_or_create(
"sensor", "test", "unique2", suggested_object_id="dup"
)
hass.states.async_set("sensor.dup", "10")
# Map first entity
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": entity1.entity_id}
)
assert result["type"] == "create_entry"
# Map second entity
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": entity2.entity_id}
)
assert result["type"] == "create_entry"
def test_validate_mapping_input_none_entity(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test that _validate_mapping_input returns 'entity_required' error for None entity."""
errors = _validate_mapping_input(None, set(), entity_registry)
assert errors == {"base": "entity_required"}
def test_validate_mapping_input_empty_string(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test that _validate_mapping_input returns 'entity_required' error for empty string entity."""
errors = _validate_mapping_input("", set(), entity_registry)
assert errors == {"base": "entity_required"}
def test_validate_mapping_input_already_mapped(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test that _validate_mapping_input returns 'entity_already_mapped' error when entity is already mapped."""
entity_entry = entity_registry.async_get_or_create(
"sensor", "test", "mapped_entity", suggested_object_id="mapped"
)
current_mappings = {entity_entry.id}
errors = _validate_mapping_input(
entity_entry.entity_id, current_mappings, entity_registry
)
assert errors == {"base": "entity_already_mapped"}
def test_get_suggested_entities_with_state_class(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test that _get_suggested_entities includes sensor entities with measurement state class."""
entity_entry = entity_registry.async_get_or_create(
"sensor",
"test",
"measurement_sensor",
suggested_object_id="measurement",
capabilities={"state_class": SensorStateClass.MEASUREMENT},
)
hass.states.async_set(entity_entry.entity_id, "100")
result = _get_suggested_entities(hass)
assert entity_entry.entity_id in result
@pytest.mark.parametrize(
("test_case"),
[
{
"name": "energy_original_device_class",
"unique_id": "energy_sensor",
"entity_id": "sensor.energy_test",
"original_device_class": SensorDeviceClass.ENERGY,
"device_class": None,
"state_value": "250",
},
{
"name": "power_original_device_class",
"unique_id": "power_sensor",
"entity_id": "sensor.power_test",
"original_device_class": SensorDeviceClass.POWER,
"device_class": None,
"state_value": "1500",
},
{
"name": "energy_user_override_device_class",
"unique_id": "override_sensor",
"entity_id": "sensor.override_test",
"original_device_class": None,
"device_class": SensorDeviceClass.ENERGY,
"state_value": "300",
},
],
ids=lambda x: x["name"],
)
def test_get_suggested_entities_with_device_class(
hass: HomeAssistant, entity_registry: er.EntityRegistry, test_case: dict[str, Any]
) -> None:
"""Test that _get_suggested_entities includes sensor entities with various device class configurations."""
entity_entry = entity_registry.async_get_or_create(
"sensor",
"test",
test_case["unique_id"],
suggested_object_id=test_case["entity_id"].split(".", 1)[-1],
original_device_class=test_case["original_device_class"],
)
if test_case["device_class"] is not None:
entity_registry.async_update_entity(
entity_entry.entity_id, device_class=test_case["device_class"]
)
hass.states.async_set(test_case["entity_id"], test_case["state_value"])
result = _get_suggested_entities(hass)
assert test_case["entity_id"] in result
async def test_subentry_entity_not_found_after_validation(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
) -> None:
"""Test that subentry flow returns error when entity is validated but disappears before registry lookup."""
mock_parent_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_PROVISIONING_KEY: TEST_PROVISIONING_KEY,
CONF_PROVISIONING_SECRET: TEST_PROVISIONING_SECRET,
CONF_DEVICE_ID: "test_device",
},
)
mock_parent_entry.add_to_hass(hass)
entity_entry = entity_registry.async_get_or_create(
"sensor", "test", "disappearing", suggested_object_id="disappear"
)
hass.states.async_set(entity_entry.entity_id, "42")
result = await hass.config_entries.subentries.async_init(
(mock_parent_entry.entry_id, "sensor_mapping"),
context={"source": "user"},
)
assert result["type"] == "form"
original_async_get = entity_registry.async_get
def mock_async_get_disappearing(entity_id):
if not hasattr(mock_async_get_disappearing, "call_count"):
mock_async_get_disappearing.call_count = 0
mock_async_get_disappearing.call_count += 1
# First call (validation) succeeds, second call (lookup) fails
return (
None
if mock_async_get_disappearing.call_count > 1
else original_async_get(entity_id)
)
with patch.object(
entity_registry, "async_get", side_effect=mock_async_get_disappearing
):
result = await hass.config_entries.subentries.async_configure(
result["flow_id"], {"ha_entity_id": entity_entry.entity_id}
)
assert result["type"] == "form"
assert result["errors"]["base"] == "entity_not_found"

File diff suppressed because it is too large Load Diff