Add get_kvs_value and set_kvs_value actions for Shelly RPC devices (#157349)

This commit is contained in:
Maciej Bieniek
2025-12-12 13:15:25 +01:00
committed by GitHub
parent 41d5415c86
commit 6e99411084
8 changed files with 559 additions and 9 deletions

View File

@@ -63,6 +63,7 @@ from .repairs import (
async_manage_open_wifi_ap_issue,
async_manage_outbound_websocket_incorrectly_enabled_issue,
)
from .services import async_setup_services
from .utils import (
async_create_issue_unsupported_firmware,
async_migrate_rpc_virtual_components_unique_ids,
@@ -117,6 +118,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
if (conf := config.get(DOMAIN)) is not None:
hass.data[DOMAIN] = {CONF_COAP_PORT: conf[CONF_COAP_PORT]}
async_setup_services(hass)
return True

View File

@@ -343,3 +343,6 @@ MODEL_FRANKEVER_IRRIGATION_CONTROLLER = "Irrigation"
ROLE_GENERIC = "generic"
TRV_CHANNEL = 0
ATTR_KEY = "key"
ATTR_VALUE = "value"

View File

@@ -105,5 +105,13 @@
}
}
}
},
"services": {
"get_kvs_value": {
"service": "mdi:import"
},
"set_kvs_value": {
"service": "mdi:export"
}
}
}

View File

@@ -1,17 +1,13 @@
rules:
# Bronze
action-setup:
status: exempt
comment: The integration does not register services.
action-setup: done
appropriate-polling: done
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: The integration does not register services.
docs-actions: done
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
@@ -24,9 +20,7 @@ rules:
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: The integration does not register services.
action-exceptions: done
config-entry-unloading: done
docs-configuration-parameters: done
docs-installation-parameters: done

View File

@@ -0,0 +1,170 @@
"""Support for services."""
from typing import TYPE_CHECKING, Any, cast
from aioshelly.const import RPC_GENERATIONS
from aioshelly.exceptions import DeviceConnectionError, RpcCallError
import voluptuous as vol
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import ATTR_DEVICE_ID
from homeassistant.core import (
HomeAssistant,
ServiceCall,
ServiceResponse,
SupportsResponse,
callback,
)
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.util.json import JsonValueType
from .const import ATTR_KEY, ATTR_VALUE, CONF_SLEEP_PERIOD, DOMAIN
from .coordinator import ShellyConfigEntry
from .utils import get_device_entry_gen
SERVICE_GET_KVS_VALUE = "get_kvs_value"
SERVICE_SET_KVS_VALUE = "set_kvs_value"
SERVICE_GET_KVS_VALUE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_DEVICE_ID): cv.string,
vol.Required(ATTR_KEY): str,
}
)
SERVICE_SET_KVS_VALUE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_DEVICE_ID): cv.string,
vol.Required(ATTR_KEY): str,
vol.Required(ATTR_VALUE): vol.Any(str, int, float, bool, dict, list, None),
}
)
@callback
def async_get_config_entry_for_service_call(
call: ServiceCall,
) -> ShellyConfigEntry:
"""Get the config entry related to a service call (by device ID)."""
device_registry = dr.async_get(call.hass)
device_id = call.data[ATTR_DEVICE_ID]
if (device_entry := device_registry.async_get(device_id)) is None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_device_id",
translation_placeholders={"device_id": device_id},
)
for entry_id in device_entry.config_entries:
config_entry = call.hass.config_entries.async_get_entry(entry_id)
if TYPE_CHECKING:
assert config_entry
if config_entry.domain != DOMAIN:
continue
if config_entry.state is not ConfigEntryState.LOADED:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="entry_not_loaded",
translation_placeholders={"device": config_entry.title},
)
if get_device_entry_gen(config_entry) not in RPC_GENERATIONS:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="kvs_not_supported",
translation_placeholders={"device": config_entry.title},
)
if config_entry.data.get(CONF_SLEEP_PERIOD, 0) > 0:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="kvs_not_supported",
translation_placeholders={"device": config_entry.title},
)
return config_entry
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="config_entry_not_found",
translation_placeholders={"device_id": device_id},
)
async def _async_execute_action(
call: ServiceCall, method: str, args: tuple
) -> dict[str, Any]:
"""Execute action on the device."""
config_entry = async_get_config_entry_for_service_call(call)
runtime_data = config_entry.runtime_data
if not runtime_data.rpc:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="device_not_initialized",
translation_placeholders={"device": config_entry.title},
)
action_method = getattr(runtime_data.rpc.device, method)
try:
response = await action_method(*args)
except RpcCallError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="rpc_call_error",
translation_placeholders={"device": config_entry.title},
) from err
except DeviceConnectionError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="device_communication_error",
translation_placeholders={"device": config_entry.title},
) from err
else:
return cast(dict[str, Any], response)
async def async_get_kvs_value(call: ServiceCall) -> ServiceResponse:
"""Handle the get_kvs_value service call."""
key = call.data[ATTR_KEY]
response = await _async_execute_action(call, "kvs_get", (key,))
result: dict[str, JsonValueType] = {}
result[ATTR_VALUE] = response[ATTR_VALUE]
return result
async def async_set_kvs_value(call: ServiceCall) -> None:
"""Handle the set_kvs_value service call."""
await _async_execute_action(
call, "kvs_set", (call.data[ATTR_KEY], call.data[ATTR_VALUE])
)
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up the services for Shelly integration."""
for service, method, schema, response in (
(
SERVICE_GET_KVS_VALUE,
async_get_kvs_value,
SERVICE_GET_KVS_VALUE_SCHEMA,
SupportsResponse.ONLY,
),
(
SERVICE_SET_KVS_VALUE,
async_set_kvs_value,
SERVICE_SET_KVS_VALUE_SCHEMA,
SupportsResponse.NONE,
),
):
hass.services.async_register(
DOMAIN,
service,
method,
schema=schema,
supports_response=response,
)

View File

@@ -0,0 +1,27 @@
get_kvs_value:
fields:
device_id:
required: true
selector:
device:
integration: shelly
key:
required: true
selector:
text:
set_kvs_value:
fields:
device_id:
required: true
selector:
device:
integration: shelly
key:
required: true
selector:
text:
value:
required: true
selector:
object:

View File

@@ -603,6 +603,9 @@
"auth_error": {
"message": "Authentication failed for {device}, please update your credentials"
},
"config_entry_not_found": {
"message": "Config entry for device ID {device_id} not found"
},
"device_communication_action_error": {
"message": "Device communication error occurred while calling action for {entity} of {device}"
},
@@ -612,12 +615,24 @@
"device_not_found": {
"message": "{device} not found while configuring device automation triggers"
},
"device_not_initialized": {
"message": "{device} not initialized"
},
"entry_not_loaded": {
"message": "Config entry not loaded for {device}"
},
"firmware_unsupported": {
"message": "{device} is running an unsupported firmware, please update the firmware"
},
"invalid_device_id": {
"message": "Invalid device ID specified: {device_id}"
},
"invalid_trigger": {
"message": "Invalid device automation trigger (type, subtype): {trigger}"
},
"kvs_not_supported": {
"message": "{device} does not support KVS"
},
"ota_update_connection_error": {
"message": "Device communication error occurred while triggering OTA update for {device}"
},
@@ -627,6 +642,9 @@
"rpc_call_action_error": {
"message": "RPC call error occurred while calling action for {entity} of {device}"
},
"rpc_call_error": {
"message": "RPC call error occurred for {device}"
},
"update_error": {
"message": "An error occurred while retrieving data from {device}"
},
@@ -748,5 +766,39 @@
"manual": "Enter address manually"
}
}
},
"services": {
"get_kvs_value": {
"description": "Get a value from the device's Key-Value Storage.",
"fields": {
"device_id": {
"description": "The ID of the Shelly device to get the KVS value from.",
"name": "Device"
},
"key": {
"description": "The name of the key for which the KVS value will be retrieved.",
"name": "Key"
}
},
"name": "Get KVS value"
},
"set_kvs_value": {
"description": "Set a value in the device's Key-Value Storage.",
"fields": {
"device_id": {
"description": "The ID of the Shelly device to set the KVS value.",
"name": "Device"
},
"key": {
"description": "The name of the key under which the KVS value will be stored.",
"name": "Key"
},
"value": {
"description": "Value to set.",
"name": "Value"
}
},
"name": "Set KVS value"
}
}
}

View File

@@ -0,0 +1,293 @@
"""Tests for Shelly services."""
from unittest.mock import Mock
from aioshelly.exceptions import DeviceConnectionError, RpcCallError
import pytest
from homeassistant.components.shelly.const import ATTR_KEY, ATTR_VALUE, DOMAIN
from homeassistant.components.shelly.services import (
SERVICE_GET_KVS_VALUE,
SERVICE_SET_KVS_VALUE,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import ATTR_DEVICE_ID
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers import device_registry as dr
from . import init_integration
from tests.common import MockConfigEntry
async def test_service_get_kvs_value(
hass: HomeAssistant,
mock_rpc_device: Mock,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test get_kvs_value service."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
mock_rpc_device.kvs_get.return_value = {
"etag": "16mLia9TRt8lGhj9Zf5Dp6Hw==",
"value": "test_value",
}
response = await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert response == {"value": "test_value"}
mock_rpc_device.kvs_get.assert_called_once_with("test_key")
async def test_service_get_kvs_value_invalid_device(hass: HomeAssistant) -> None:
"""Test get_kvs_value service with invalid device ID."""
await init_integration(hass, 2)
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: "invalid_device_id", ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "invalid_device_id"
assert exc_info.value.translation_placeholders == {
ATTR_DEVICE_ID: "invalid_device_id"
}
async def test_service_get_kvs_value_block_device(
hass: HomeAssistant, mock_block_device: Mock, device_registry: dr.DeviceRegistry
) -> None:
"""Test get_kvs_value service with non-RPC (Gen1) device."""
entry = await init_integration(hass, 1)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "kvs_not_supported"
assert exc_info.value.translation_placeholders == {"device": entry.title}
@pytest.mark.parametrize(
("exc", "translation_key"),
[
(RpcCallError(999), "rpc_call_error"),
(DeviceConnectionError, "device_communication_error"),
],
)
async def test_service_get_kvs_value_exc(
hass: HomeAssistant,
mock_rpc_device: Mock,
device_registry: dr.DeviceRegistry,
exc: Exception,
translation_key: str,
) -> None:
"""Test get_kvs_value service with exception."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
mock_rpc_device.kvs_get.side_effect = exc
with pytest.raises(HomeAssistantError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == translation_key
assert exc_info.value.translation_placeholders == {"device": entry.title}
async def test_config_entry_not_loaded(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
mock_rpc_device: Mock,
) -> None:
"""Test config entry not loaded."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state is ConfigEntryState.NOT_LOADED
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "entry_not_loaded"
assert exc_info.value.translation_placeholders == {"device": entry.title}
async def test_service_get_kvs_value_sleeping_device(
hass: HomeAssistant, mock_rpc_device: Mock, device_registry: dr.DeviceRegistry
) -> None:
"""Test get_kvs_value service with RPC sleeping device."""
entry = await init_integration(hass, 2, sleep_period=1000)
# Make device online
mock_rpc_device.mock_online()
await hass.async_block_till_done(wait_background_tasks=True)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "kvs_not_supported"
assert exc_info.value.translation_placeholders == {"device": entry.title}
async def test_service_set_kvs_value(
hass: HomeAssistant,
mock_rpc_device: Mock,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test set_kvs_value service."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
await hass.services.async_call(
DOMAIN,
SERVICE_SET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key", ATTR_VALUE: "test_value"},
blocking=True,
)
mock_rpc_device.kvs_set.assert_called_once_with("test_key", "test_value")
async def test_service_get_kvs_value_config_entry_not_found(
hass: HomeAssistant, mock_rpc_device: Mock, device_registry: dr.DeviceRegistry
) -> None:
"""Test device with no config entries."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
# Remove all config entries from device
device_registry.devices[device.id].config_entries.clear()
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "config_entry_not_found"
assert exc_info.value.translation_placeholders == {"device_id": device.id}
async def test_service_get_kvs_value_device_not_initialized(
hass: HomeAssistant,
mock_rpc_device: Mock,
device_registry: dr.DeviceRegistry,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test get_kvs_value if runtime_data.rpc is None."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
monkeypatch.delattr(entry.runtime_data, "rpc")
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "device_not_initialized"
assert exc_info.value.translation_placeholders == {"device": entry.title}
async def test_service_get_kvs_value_wrong_domain(
hass: HomeAssistant,
mock_rpc_device: Mock,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test get_kvs_value when device has config entries from different domains."""
entry = await init_integration(hass, 2)
device = dr.async_entries_for_config_entry(device_registry, entry.entry_id)[0]
# Create a config entry with different domain and add it to the device
other_entry = MockConfigEntry(
domain="other_domain",
data={},
)
other_entry.add_to_hass(hass)
# Add the other domain's config entry to the device
device_registry.async_update_device(
device.id, add_config_entry_id=other_entry.entry_id
)
# Remove the original Shelly config entry
device_registry.async_update_device(
device.id, remove_config_entry_id=entry.entry_id
)
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
SERVICE_GET_KVS_VALUE,
{ATTR_DEVICE_ID: device.id, ATTR_KEY: "test_key"},
blocking=True,
return_response=True,
)
assert exc_info.value.translation_domain == DOMAIN
assert exc_info.value.translation_key == "config_entry_not_found"
assert exc_info.value.translation_placeholders == {"device_id": device.id}