From 78e16495bde626701cd1d18fcec7db8f6dd26c5b Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Thu, 2 Oct 2025 23:15:15 +0200 Subject: [PATCH] Remove runtime support for recorder DB without States.last_reported_ts (#153495) --- homeassistant/components/recorder/const.py | 1 - homeassistant/components/recorder/core.py | 3 +- .../components/recorder/history/modern.py | 28 +- tests/components/recorder/db_schema_42.py | 859 -------------- .../recorder/test_history_db_schema_42.py | 1022 ----------------- 5 files changed, 10 insertions(+), 1903 deletions(-) delete mode 100644 tests/components/recorder/db_schema_42.py delete mode 100644 tests/components/recorder/test_history_db_schema_42.py diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index 4797eecda0f..b1563d85d56 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -53,7 +53,6 @@ KEEPALIVE_TIME = 30 CONTEXT_ID_AS_BINARY_SCHEMA_VERSION = 36 EVENT_TYPE_IDS_SCHEMA_VERSION = 37 STATES_META_SCHEMA_VERSION = 38 -LAST_REPORTED_SCHEMA_VERSION = 43 CIRCULAR_MEAN_SCHEMA_VERSION = 49 LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION = 28 diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index b135f7a3ee8..d662416012f 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -56,7 +56,6 @@ from .const import ( DEFAULT_MAX_BIND_VARS, DOMAIN, KEEPALIVE_TIME, - LAST_REPORTED_SCHEMA_VERSION, MARIADB_PYMYSQL_URL_PREFIX, MARIADB_URL_PREFIX, MAX_QUEUE_BACKLOG_MIN_VALUE, @@ -1226,7 +1225,7 @@ class Recorder(threading.Thread): if ( pending_last_reported := self.states_manager.get_pending_last_reported_timestamp() - ) and self.schema_version >= LAST_REPORTED_SCHEMA_VERSION: + ): with session.no_autoflush: session.execute( update(States), diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py index 566e30713f0..a636ed34ef4 100644 --- a/homeassistant/components/recorder/history/modern.py +++ b/homeassistant/components/recorder/history/modern.py @@ -29,7 +29,7 @@ from homeassistant.helpers.recorder import get_instance from homeassistant.util import dt as dt_util from homeassistant.util.collection import chunked_or_all -from ..const import LAST_REPORTED_SCHEMA_VERSION, MAX_IDS_FOR_INDEXED_GROUP_BY +from ..const import MAX_IDS_FOR_INDEXED_GROUP_BY from ..db_schema import ( SHARED_ATTR_OR_LEGACY_ATTRIBUTES, StateAttributes, @@ -388,10 +388,9 @@ def _state_changed_during_period_stmt( limit: int | None, include_start_time_state: bool, run_start_ts: float | None, - include_last_reported: bool, ) -> Select | CompoundSelect: stmt = ( - _stmt_and_join_attributes(no_attributes, False, include_last_reported) + _stmt_and_join_attributes(no_attributes, False, True) .filter( ( (States.last_changed_ts == States.last_updated_ts) @@ -424,22 +423,22 @@ def _state_changed_during_period_stmt( single_metadata_id, no_attributes, False, - include_last_reported, + True, ).subquery(), no_attributes, False, - include_last_reported, + True, ), _select_from_subquery( stmt.subquery(), no_attributes, False, - include_last_reported, + True, ), ).subquery(), no_attributes, False, - include_last_reported, + True, ) @@ -454,9 +453,6 @@ def state_changes_during_period( include_start_time_state: bool = True, ) -> dict[str, list[State]]: """Return states changes during UTC period start_time - end_time.""" - has_last_reported = ( - get_instance(hass).schema_version >= LAST_REPORTED_SCHEMA_VERSION - ) if not entity_id: raise ValueError("entity_id must be provided") entity_ids = [entity_id.lower()] @@ -489,14 +485,12 @@ def state_changes_during_period( limit, include_start_time_state, oldest_ts, - has_last_reported, ), track_on=[ bool(end_time_ts), no_attributes, bool(limit), include_start_time_state, - has_last_reported, ], ) return cast( @@ -543,10 +537,10 @@ def _get_last_state_changes_single_stmt(metadata_id: int) -> Select: def _get_last_state_changes_multiple_stmt( - number_of_states: int, metadata_id: int, include_last_reported: bool + number_of_states: int, metadata_id: int ) -> Select: return ( - _stmt_and_join_attributes(False, False, include_last_reported) + _stmt_and_join_attributes(False, False, True) .where( States.state_id == ( @@ -568,9 +562,6 @@ def get_last_state_changes( hass: HomeAssistant, number_of_states: int, entity_id: str ) -> dict[str, list[State]]: """Return the last number_of_states.""" - has_last_reported = ( - get_instance(hass).schema_version >= LAST_REPORTED_SCHEMA_VERSION - ) entity_id_lower = entity_id.lower() entity_ids = [entity_id_lower] @@ -595,9 +586,8 @@ def get_last_state_changes( else: stmt = lambda_stmt( lambda: _get_last_state_changes_multiple_stmt( - number_of_states, metadata_id, has_last_reported + number_of_states, metadata_id ), - track_on=[has_last_reported], ) states = list(execute_stmt_lambda_element(session, stmt, orm_rows=False)) return cast( diff --git a/tests/components/recorder/db_schema_42.py b/tests/components/recorder/db_schema_42.py deleted file mode 100644 index b0cdecd88dc..00000000000 --- a/tests/components/recorder/db_schema_42.py +++ /dev/null @@ -1,859 +0,0 @@ -"""Models for SQLAlchemy. - -This file contains the model definitions for schema version 42. -It is used to test the schema migration logic. -""" - -from __future__ import annotations - -from collections.abc import Callable -from datetime import datetime, timedelta -import logging -import time -from typing import Any, Self, cast - -import ciso8601 -from fnv_hash_fast import fnv1a_32 -from sqlalchemy import ( - CHAR, - JSON, - BigInteger, - Boolean, - ColumnElement, - DateTime, - Float, - ForeignKey, - Identity, - Index, - Integer, - LargeBinary, - SmallInteger, - String, - Text, - case, - type_coerce, -) -from sqlalchemy.dialects import mysql, oracle, postgresql, sqlite -from sqlalchemy.engine.interfaces import Dialect -from sqlalchemy.ext.compiler import compiles -from sqlalchemy.orm import DeclarativeBase, Mapped, aliased, mapped_column, relationship -from sqlalchemy.types import TypeDecorator - -from homeassistant.components.recorder.const import ( - ALL_DOMAIN_EXCLUDE_ATTRS, - SupportedDialect, -) -from homeassistant.components.recorder.models import ( - StatisticData, - StatisticDataTimestamp, - StatisticMetaData, - bytes_to_ulid_or_none, - bytes_to_uuid_hex_or_none, - datetime_to_timestamp_or_none, - process_timestamp, - ulid_to_bytes_or_none, - uuid_hex_to_bytes_or_none, -) -from homeassistant.components.sensor import ATTR_STATE_CLASS -from homeassistant.const import ( - ATTR_DEVICE_CLASS, - ATTR_FRIENDLY_NAME, - ATTR_UNIT_OF_MEASUREMENT, - MATCH_ALL, - MAX_LENGTH_EVENT_EVENT_TYPE, - MAX_LENGTH_STATE_ENTITY_ID, - MAX_LENGTH_STATE_STATE, -) -from homeassistant.core import Context, Event, EventOrigin, State -from homeassistant.helpers.json import JSON_DUMP, json_bytes, json_bytes_strip_null -from homeassistant.util import dt as dt_util -from homeassistant.util.json import ( - JSON_DECODE_EXCEPTIONS, - json_loads, - json_loads_object, -) - - -# SQLAlchemy Schema -class Base(DeclarativeBase): - """Base class for tables.""" - - -SCHEMA_VERSION = 42 - -_LOGGER = logging.getLogger(__name__) - -TABLE_EVENTS = "events" -TABLE_EVENT_DATA = "event_data" -TABLE_EVENT_TYPES = "event_types" -TABLE_STATES = "states" -TABLE_STATE_ATTRIBUTES = "state_attributes" -TABLE_STATES_META = "states_meta" -TABLE_RECORDER_RUNS = "recorder_runs" -TABLE_SCHEMA_CHANGES = "schema_changes" -TABLE_STATISTICS = "statistics" -TABLE_STATISTICS_META = "statistics_meta" -TABLE_STATISTICS_RUNS = "statistics_runs" -TABLE_STATISTICS_SHORT_TERM = "statistics_short_term" - -STATISTICS_TABLES = ("statistics", "statistics_short_term") - -MAX_STATE_ATTRS_BYTES = 16384 -MAX_EVENT_DATA_BYTES = 32768 - -PSQL_DIALECT = SupportedDialect.POSTGRESQL - -ALL_TABLES = [ - TABLE_STATES, - TABLE_STATE_ATTRIBUTES, - TABLE_EVENTS, - TABLE_EVENT_DATA, - TABLE_EVENT_TYPES, - TABLE_RECORDER_RUNS, - TABLE_SCHEMA_CHANGES, - TABLE_STATES_META, - TABLE_STATISTICS, - TABLE_STATISTICS_META, - TABLE_STATISTICS_RUNS, - TABLE_STATISTICS_SHORT_TERM, -] - -TABLES_TO_CHECK = [ - TABLE_STATES, - TABLE_EVENTS, - TABLE_RECORDER_RUNS, - TABLE_SCHEMA_CHANGES, -] - -LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts" -METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts" -EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" -STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" -LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id" -LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated_ts" -CONTEXT_ID_BIN_MAX_LENGTH = 16 - -MYSQL_COLLATE = "utf8mb4_unicode_ci" -MYSQL_DEFAULT_CHARSET = "utf8mb4" -MYSQL_ENGINE = "InnoDB" - -_DEFAULT_TABLE_ARGS = { - "mysql_default_charset": MYSQL_DEFAULT_CHARSET, - "mysql_collate": MYSQL_COLLATE, - "mysql_engine": MYSQL_ENGINE, - "mariadb_default_charset": MYSQL_DEFAULT_CHARSET, - "mariadb_collate": MYSQL_COLLATE, - "mariadb_engine": MYSQL_ENGINE, -} - - -class UnusedDateTime(DateTime): - """An unused column type that behaves like a datetime.""" - - -class Unused(CHAR): - """An unused column type that behaves like a string.""" - - -@compiles(UnusedDateTime, "mysql", "mariadb", "sqlite") # type: ignore[misc,no-untyped-call] -@compiles(Unused, "mysql", "mariadb", "sqlite") # type: ignore[misc,no-untyped-call] -def compile_char_zero(type_: TypeDecorator, compiler: Any, **kw: Any) -> str: - """Compile UnusedDateTime and Unused as CHAR(0) on mysql, mariadb, and sqlite.""" - return "CHAR(0)" # Uses 1 byte on MySQL (no change on sqlite) - - -@compiles(Unused, "postgresql") # type: ignore[misc,no-untyped-call] -def compile_char_one(type_: TypeDecorator, compiler: Any, **kw: Any) -> str: - """Compile Unused as CHAR(1) on postgresql.""" - return "CHAR(1)" # Uses 1 byte - - -class FAST_PYSQLITE_DATETIME(sqlite.DATETIME): - """Use ciso8601 to parse datetimes instead of sqlalchemy built-in regex.""" - - def result_processor(self, dialect: Dialect, coltype: Any) -> Callable | None: - """Offload the datetime parsing to ciso8601.""" - return lambda value: None if value is None else ciso8601.parse_datetime(value) - - -class NativeLargeBinary(LargeBinary): - """A faster version of LargeBinary for engines that support python bytes natively.""" - - def result_processor(self, dialect: Dialect, coltype: Any) -> Callable | None: - """No conversion needed for engines that support native bytes.""" - return None - - -# For MariaDB and MySQL we can use an unsigned integer type since it will fit 2**32 -# for sqlite and postgresql we use a bigint -UINT_32_TYPE = BigInteger().with_variant( - mysql.INTEGER(unsigned=True), # type: ignore[no-untyped-call] - "mysql", - "mariadb", -) -JSON_VARIANT_CAST = Text().with_variant( - postgresql.JSON(none_as_null=True), # type: ignore[no-untyped-call] - "postgresql", -) -JSONB_VARIANT_CAST = Text().with_variant( - postgresql.JSONB(none_as_null=True), # type: ignore[no-untyped-call] - "postgresql", -) -DATETIME_TYPE = ( - DateTime(timezone=True) - .with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql", "mariadb") # type: ignore[no-untyped-call] - .with_variant(FAST_PYSQLITE_DATETIME(), "sqlite") # type: ignore[no-untyped-call] -) -DOUBLE_TYPE = ( - Float() - .with_variant(mysql.DOUBLE(asdecimal=False), "mysql", "mariadb") # type: ignore[no-untyped-call] - .with_variant(oracle.DOUBLE_PRECISION(), "oracle") - .with_variant(postgresql.DOUBLE_PRECISION(), "postgresql") -) -UNUSED_LEGACY_COLUMN = Unused(0) -UNUSED_LEGACY_DATETIME_COLUMN = UnusedDateTime(timezone=True) -UNUSED_LEGACY_INTEGER_COLUMN = SmallInteger() -DOUBLE_PRECISION_TYPE_SQL = "DOUBLE PRECISION" -CONTEXT_BINARY_TYPE = LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH).with_variant( - NativeLargeBinary(CONTEXT_ID_BIN_MAX_LENGTH), "mysql", "mariadb", "sqlite" -) - -TIMESTAMP_TYPE = DOUBLE_TYPE - - -class JSONLiteral(JSON): - """Teach SA how to literalize json.""" - - def literal_processor(self, dialect: Dialect) -> Callable[[Any], str]: - """Processor to convert a value to JSON.""" - - def process(value: Any) -> str: - """Dump json.""" - return JSON_DUMP(value) - - return process - - -EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote] -EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)} - - -class Events(Base): - """Event history data.""" - - __table_args__ = ( - # Used for fetching events at a specific time - # see logbook - Index( - "ix_events_event_type_id_time_fired_ts", "event_type_id", "time_fired_ts" - ), - Index( - EVENTS_CONTEXT_ID_BIN_INDEX, - "context_id_bin", - mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, - mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, - ), - _DEFAULT_TABLE_ARGS, - ) - __tablename__ = TABLE_EVENTS - event_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - event_type: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - event_data: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - origin: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - origin_idx: Mapped[int | None] = mapped_column(SmallInteger) - time_fired: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) - time_fired_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True) - context_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - context_user_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - context_parent_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - data_id: Mapped[int | None] = mapped_column( - Integer, ForeignKey("event_data.data_id"), index=True - ) - context_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE) - context_user_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE) - context_parent_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE) - event_type_id: Mapped[int | None] = mapped_column( - Integer, ForeignKey("event_types.event_type_id") - ) - event_data_rel: Mapped[EventData | None] = relationship("EventData") - event_type_rel: Mapped[EventTypes | None] = relationship("EventTypes") - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - "" - ) - - @property - def _time_fired_isotime(self) -> str | None: - """Return time_fired as an isotime string.""" - date_time: datetime | None - if self.time_fired_ts is not None: - date_time = dt_util.utc_from_timestamp(self.time_fired_ts) - else: - date_time = process_timestamp(self.time_fired) - if date_time is None: - return None - return date_time.isoformat(sep=" ", timespec="seconds") - - @staticmethod - def from_event(event: Event) -> Events: - """Create an event database object from a native event.""" - return Events( - event_type=None, - event_data=None, - origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin), - time_fired=None, - time_fired_ts=event.time_fired_timestamp, - context_id=None, - context_id_bin=ulid_to_bytes_or_none(event.context.id), - context_user_id=None, - context_user_id_bin=uuid_hex_to_bytes_or_none(event.context.user_id), - context_parent_id=None, - context_parent_id_bin=ulid_to_bytes_or_none(event.context.parent_id), - ) - - def to_native(self, validate_entity_id: bool = True) -> Event | None: - """Convert to a native HA Event.""" - context = Context( - id=bytes_to_ulid_or_none(self.context_id_bin), - user_id=bytes_to_uuid_hex_or_none(self.context_user_id_bin), - parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin), - ) - try: - return Event( - self.event_type or "", - json_loads_object(self.event_data) if self.event_data else {}, - EventOrigin(self.origin) - if self.origin - else EVENT_ORIGIN_ORDER[self.origin_idx or 0], - dt_util.utc_from_timestamp(self.time_fired_ts or 0), - context=context, - ) - except JSON_DECODE_EXCEPTIONS: - # When json_loads fails - _LOGGER.exception("Error converting to event: %s", self) - return None - - -class EventData(Base): - """Event data history.""" - - __table_args__ = (_DEFAULT_TABLE_ARGS,) - __tablename__ = TABLE_EVENT_DATA - data_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - hash: Mapped[int | None] = mapped_column(UINT_32_TYPE, index=True) - # Note that this is not named attributes to avoid confusion with the states table - shared_data: Mapped[str | None] = mapped_column( - Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb") - ) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - "" - ) - - @staticmethod - def shared_data_bytes_from_event( - event: Event, dialect: SupportedDialect | None - ) -> bytes: - """Create shared_data from an event.""" - if dialect == SupportedDialect.POSTGRESQL: - bytes_result = json_bytes_strip_null(event.data) - bytes_result = json_bytes(event.data) - if len(bytes_result) > MAX_EVENT_DATA_BYTES: - _LOGGER.warning( - "Event data for %s exceed maximum size of %s bytes. " - "This can cause database performance issues; Event data " - "will not be stored", - event.event_type, - MAX_EVENT_DATA_BYTES, - ) - return b"{}" - return bytes_result - - @staticmethod - def hash_shared_data_bytes(shared_data_bytes: bytes) -> int: - """Return the hash of json encoded shared data.""" - return fnv1a_32(shared_data_bytes) - - def to_native(self) -> dict[str, Any]: - """Convert to an event data dictionary.""" - shared_data = self.shared_data - if shared_data is None: - return {} - try: - return cast(dict[str, Any], json_loads(shared_data)) - except JSON_DECODE_EXCEPTIONS: - _LOGGER.exception("Error converting row to event data: %s", self) - return {} - - -class EventTypes(Base): - """Event type history.""" - - __table_args__ = (_DEFAULT_TABLE_ARGS,) - __tablename__ = TABLE_EVENT_TYPES - event_type_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - event_type: Mapped[str | None] = mapped_column( - String(MAX_LENGTH_EVENT_EVENT_TYPE), index=True, unique=True - ) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - "" - ) - - -class States(Base): - """State change history.""" - - __table_args__ = ( - # Used for fetching the state of entities at a specific time - # (get_states in history.py) - Index(METADATA_ID_LAST_UPDATED_INDEX_TS, "metadata_id", "last_updated_ts"), - Index( - STATES_CONTEXT_ID_BIN_INDEX, - "context_id_bin", - mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, - mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, - ), - _DEFAULT_TABLE_ARGS, - ) - __tablename__ = TABLE_STATES - state_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - entity_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - state: Mapped[str | None] = mapped_column(String(MAX_LENGTH_STATE_STATE)) - attributes: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - event_id: Mapped[int | None] = mapped_column(UNUSED_LEGACY_INTEGER_COLUMN) - last_changed: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) - last_changed_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE) - last_updated: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) - last_updated_ts: Mapped[float | None] = mapped_column( - TIMESTAMP_TYPE, default=time.time, index=True - ) - old_state_id: Mapped[int | None] = mapped_column( - Integer, ForeignKey("states.state_id"), index=True - ) - attributes_id: Mapped[int | None] = mapped_column( - Integer, ForeignKey("state_attributes.attributes_id"), index=True - ) - context_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - context_user_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - context_parent_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN) - origin_idx: Mapped[int | None] = mapped_column( - SmallInteger - ) # 0 is local, 1 is remote - old_state: Mapped[States | None] = relationship("States", remote_side=[state_id]) - state_attributes: Mapped[StateAttributes | None] = relationship("StateAttributes") - context_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE) - context_user_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE) - context_parent_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE) - metadata_id: Mapped[int | None] = mapped_column( - Integer, ForeignKey("states_meta.metadata_id") - ) - states_meta_rel: Mapped[StatesMeta | None] = relationship("StatesMeta") - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - f"" - ) - - @property - def _last_updated_isotime(self) -> str | None: - """Return last_updated as an isotime string.""" - date_time: datetime | None - if self.last_updated_ts is not None: - date_time = dt_util.utc_from_timestamp(self.last_updated_ts) - else: - date_time = process_timestamp(self.last_updated) - if date_time is None: - return None - return date_time.isoformat(sep=" ", timespec="seconds") - - @staticmethod - def from_event(event: Event) -> States: - """Create object from a state_changed event.""" - state: State | None = event.data.get("new_state") - dbstate = States( - entity_id=None, - attributes=None, - context_id=None, - context_id_bin=ulid_to_bytes_or_none(event.context.id), - context_user_id=None, - context_user_id_bin=uuid_hex_to_bytes_or_none(event.context.user_id), - context_parent_id=None, - context_parent_id_bin=ulid_to_bytes_or_none(event.context.parent_id), - origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin), - last_updated=None, - last_changed=None, - ) - # None state means the state was removed from the state machine - if state is None: - dbstate.state = "" - dbstate.last_updated_ts = event.time_fired_timestamp - dbstate.last_changed_ts = None - return dbstate - - dbstate.state = state.state - dbstate.last_updated_ts = state.last_updated_timestamp - if state.last_updated == state.last_changed: - dbstate.last_changed_ts = None - else: - dbstate.last_changed_ts = state.last_changed_timestamp - - return dbstate - - def to_native(self, validate_entity_id: bool = True) -> State | None: - """Convert to an HA state object.""" - context = Context( - id=bytes_to_ulid_or_none(self.context_id_bin), - user_id=bytes_to_uuid_hex_or_none(self.context_user_id_bin), - parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin), - ) - try: - attrs = json_loads_object(self.attributes) if self.attributes else {} - except JSON_DECODE_EXCEPTIONS: - # When json_loads fails - _LOGGER.exception("Error converting row to state: %s", self) - return None - if self.last_changed_ts is None or self.last_changed_ts == self.last_updated_ts: - last_changed = last_updated = dt_util.utc_from_timestamp( - self.last_updated_ts or 0 - ) - else: - last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0) - last_changed = dt_util.utc_from_timestamp(self.last_changed_ts or 0) - return State( - self.entity_id or "", - self.state, # type: ignore[arg-type] - # Join the state_attributes table on attributes_id to get the attributes - # for newer states - attrs, - last_changed, - last_updated, - context=context, - validate_entity_id=validate_entity_id, - ) - - -class StateAttributes(Base): - """State attribute change history.""" - - __table_args__ = (_DEFAULT_TABLE_ARGS,) - __tablename__ = TABLE_STATE_ATTRIBUTES - attributes_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - hash: Mapped[int | None] = mapped_column(UINT_32_TYPE, index=True) - # Note that this is not named attributes to avoid confusion with the states table - shared_attrs: Mapped[str | None] = mapped_column( - Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb") - ) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - f"" - ) - - @staticmethod - def shared_attrs_bytes_from_event( - event: Event, - dialect: SupportedDialect | None, - ) -> bytes: - """Create shared_attrs from a state_changed event.""" - state: State | None = event.data.get("new_state") - # None state means the state was removed from the state machine - if state is None: - return b"{}" - if state_info := state.state_info: - unrecorded_attributes = state_info["unrecorded_attributes"] - exclude_attrs = { - *ALL_DOMAIN_EXCLUDE_ATTRS, - *unrecorded_attributes, - } - if MATCH_ALL in unrecorded_attributes: - # Don't exclude device class, state class, unit of measurement - # or friendly name when using the MATCH_ALL exclude constant - _exclude_attributes = { - k: v - for k, v in state.attributes.items() - if k - not in ( - ATTR_DEVICE_CLASS, - ATTR_STATE_CLASS, - ATTR_UNIT_OF_MEASUREMENT, - ATTR_FRIENDLY_NAME, - ) - } - exclude_attrs.update(_exclude_attributes) - - else: - exclude_attrs = ALL_DOMAIN_EXCLUDE_ATTRS - encoder = json_bytes_strip_null if dialect == PSQL_DIALECT else json_bytes - bytes_result = encoder( - {k: v for k, v in state.attributes.items() if k not in exclude_attrs} - ) - if len(bytes_result) > MAX_STATE_ATTRS_BYTES: - _LOGGER.warning( - "State attributes for %s exceed maximum size of %s bytes. " - "This can cause database performance issues; Attributes " - "will not be stored", - state.entity_id, - MAX_STATE_ATTRS_BYTES, - ) - return b"{}" - return bytes_result - - @staticmethod - def hash_shared_attrs_bytes(shared_attrs_bytes: bytes) -> int: - """Return the hash of json encoded shared attributes.""" - return fnv1a_32(shared_attrs_bytes) - - def to_native(self) -> dict[str, Any]: - """Convert to a state attributes dictionary.""" - shared_attrs = self.shared_attrs - if shared_attrs is None: - return {} - try: - return cast(dict[str, Any], json_loads(shared_attrs)) - except JSON_DECODE_EXCEPTIONS: - # When json_loads fails - _LOGGER.exception("Error converting row to state attributes: %s", self) - return {} - - -class StatesMeta(Base): - """Metadata for states.""" - - __table_args__ = (_DEFAULT_TABLE_ARGS,) - __tablename__ = TABLE_STATES_META - metadata_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - entity_id: Mapped[str | None] = mapped_column( - String(MAX_LENGTH_STATE_ENTITY_ID), index=True, unique=True - ) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - "" - ) - - -class StatisticsBase: - """Statistics base class.""" - - id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - created: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) - created_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, default=time.time) - metadata_id: Mapped[int | None] = mapped_column( - Integer, - ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"), - ) - start: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) - start_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True) - mean: Mapped[float | None] = mapped_column(DOUBLE_TYPE) - min: Mapped[float | None] = mapped_column(DOUBLE_TYPE) - max: Mapped[float | None] = mapped_column(DOUBLE_TYPE) - last_reset: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) - last_reset_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE) - state: Mapped[float | None] = mapped_column(DOUBLE_TYPE) - sum: Mapped[float | None] = mapped_column(DOUBLE_TYPE) - - duration: timedelta - - @classmethod - def from_stats(cls, metadata_id: int, stats: StatisticData) -> Self: - """Create object from a statistics with datatime objects.""" - return cls( # type: ignore[call-arg] - metadata_id=metadata_id, - created=None, - created_ts=time.time(), - start=None, - start_ts=stats["start"].timestamp(), - mean=stats.get("mean"), - min=stats.get("min"), - max=stats.get("max"), - last_reset=None, - last_reset_ts=datetime_to_timestamp_or_none(stats.get("last_reset")), - state=stats.get("state"), - sum=stats.get("sum"), - ) - - @classmethod - def from_stats_ts(cls, metadata_id: int, stats: StatisticDataTimestamp) -> Self: - """Create object from a statistics with timestamps.""" - return cls( # type: ignore[call-arg] - metadata_id=metadata_id, - created=None, - created_ts=time.time(), - start=None, - start_ts=stats["start_ts"], - mean=stats.get("mean"), - min=stats.get("min"), - max=stats.get("max"), - last_reset=None, - last_reset_ts=stats.get("last_reset_ts"), - state=stats.get("state"), - sum=stats.get("sum"), - ) - - -class Statistics(Base, StatisticsBase): - """Long term statistics.""" - - duration = timedelta(hours=1) - - __table_args__ = ( - # Used for fetching statistics for a certain entity at a specific time - Index( - "ix_statistics_statistic_id_start_ts", - "metadata_id", - "start_ts", - unique=True, - ), - ) - __tablename__ = TABLE_STATISTICS - - -class StatisticsShortTerm(Base, StatisticsBase): - """Short term statistics.""" - - duration = timedelta(minutes=5) - - __table_args__ = ( - # Used for fetching statistics for a certain entity at a specific time - Index( - "ix_statistics_short_term_statistic_id_start_ts", - "metadata_id", - "start_ts", - unique=True, - ), - ) - __tablename__ = TABLE_STATISTICS_SHORT_TERM - - -class StatisticsMeta(Base): - """Statistics meta data.""" - - __table_args__ = (_DEFAULT_TABLE_ARGS,) - __tablename__ = TABLE_STATISTICS_META - id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - statistic_id: Mapped[str | None] = mapped_column( - String(255), index=True, unique=True - ) - source: Mapped[str | None] = mapped_column(String(32)) - unit_of_measurement: Mapped[str | None] = mapped_column(String(255)) - has_mean: Mapped[bool | None] = mapped_column(Boolean) - has_sum: Mapped[bool | None] = mapped_column(Boolean) - name: Mapped[str | None] = mapped_column(String(255)) - - @staticmethod - def from_meta(meta: StatisticMetaData) -> StatisticsMeta: - """Create object from meta data.""" - return StatisticsMeta(**meta) - - -class RecorderRuns(Base): - """Representation of recorder run.""" - - __table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),) - __tablename__ = TABLE_RECORDER_RUNS - run_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - start: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow) - end: Mapped[datetime | None] = mapped_column(DATETIME_TYPE) - closed_incorrect: Mapped[bool] = mapped_column(Boolean, default=False) - created: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - end = ( - f"'{self.end.isoformat(sep=' ', timespec='seconds')}'" if self.end else None - ) - return ( - f"" - ) - - def to_native(self, validate_entity_id: bool = True) -> Self: - """Return self, native format is this model.""" - return self - - -class SchemaChanges(Base): - """Representation of schema version changes.""" - - __tablename__ = TABLE_SCHEMA_CHANGES - change_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - schema_version: Mapped[int | None] = mapped_column(Integer) - changed: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - "" - ) - - -class StatisticsRuns(Base): - """Representation of statistics run.""" - - __tablename__ = TABLE_STATISTICS_RUNS - run_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - start: Mapped[datetime] = mapped_column(DATETIME_TYPE, index=True) - - def __repr__(self) -> str: - """Return string representation of instance for debugging.""" - return ( - f"" - ) - - -EVENT_DATA_JSON = type_coerce( - EventData.shared_data.cast(JSONB_VARIANT_CAST), JSONLiteral(none_as_null=True) -) -OLD_FORMAT_EVENT_DATA_JSON = type_coerce( - Events.event_data.cast(JSONB_VARIANT_CAST), JSONLiteral(none_as_null=True) -) - -SHARED_ATTRS_JSON = type_coerce( - StateAttributes.shared_attrs.cast(JSON_VARIANT_CAST), JSON(none_as_null=True) -) -OLD_FORMAT_ATTRS_JSON = type_coerce( - States.attributes.cast(JSON_VARIANT_CAST), JSON(none_as_null=True) -) - -ENTITY_ID_IN_EVENT: ColumnElement = EVENT_DATA_JSON["entity_id"] -OLD_ENTITY_ID_IN_EVENT: ColumnElement = OLD_FORMAT_EVENT_DATA_JSON["entity_id"] -DEVICE_ID_IN_EVENT: ColumnElement = EVENT_DATA_JSON["device_id"] -OLD_STATE = aliased(States, name="old_state") - -SHARED_ATTR_OR_LEGACY_ATTRIBUTES = case( - (StateAttributes.shared_attrs.is_(None), States.attributes), - else_=StateAttributes.shared_attrs, -).label("attributes") -SHARED_DATA_OR_LEGACY_EVENT_DATA = case( - (EventData.shared_data.is_(None), Events.event_data), else_=EventData.shared_data -).label("event_data") diff --git a/tests/components/recorder/test_history_db_schema_42.py b/tests/components/recorder/test_history_db_schema_42.py deleted file mode 100644 index 20d0c162d35..00000000000 --- a/tests/components/recorder/test_history_db_schema_42.py +++ /dev/null @@ -1,1022 +0,0 @@ -"""The tests the History component.""" - -from __future__ import annotations - -from collections.abc import Generator -from copy import copy -from datetime import datetime, timedelta -import json -from unittest.mock import sentinel - -from freezegun import freeze_time -import pytest - -from homeassistant import core as ha -from homeassistant.components import recorder -from homeassistant.components.recorder import Recorder, history -from homeassistant.components.recorder.filters import Filters -from homeassistant.components.recorder.models import process_timestamp -from homeassistant.components.recorder.util import session_scope -from homeassistant.core import HomeAssistant, State -from homeassistant.helpers.json import JSONEncoder -from homeassistant.util import dt as dt_util - -from .common import ( - assert_dict_of_states_equal_without_context_and_last_changed, - assert_multiple_states_equal_without_context, - assert_multiple_states_equal_without_context_and_last_changed, - assert_states_equal_without_context, - async_recorder_block_till_done, - async_wait_recording_done, - old_db_schema, -) -from .db_schema_42 import StateAttributes, States, StatesMeta - -from tests.typing import RecorderInstanceContextManager - - -@pytest.fixture -async def mock_recorder_before_hass( - async_test_recorder: RecorderInstanceContextManager, -) -> None: - """Set up recorder.""" - - -@pytest.fixture(autouse=True) -def db_schema_42(hass: HomeAssistant) -> Generator[None]: - """Fixture to initialize the db with the old schema 42.""" - with old_db_schema(hass, "42"): - yield - - -@pytest.fixture(autouse=True) -def setup_recorder(db_schema_42, recorder_mock: Recorder) -> recorder.Recorder: - """Set up recorder.""" - - -async def test_get_full_significant_states_with_session_entity_no_matches( - hass: HomeAssistant, -) -> None: - """Test getting states at a specific point in time for entities that never have been recorded.""" - now = dt_util.utcnow() - time_before_recorder_ran = now - timedelta(days=1000) - with session_scope(hass=hass, read_only=True) as session: - assert ( - history.get_full_significant_states_with_session( - hass, session, time_before_recorder_ran, now, entity_ids=["demo.id"] - ) - == {} - ) - assert ( - history.get_full_significant_states_with_session( - hass, - session, - time_before_recorder_ran, - now, - entity_ids=["demo.id", "demo.id2"], - ) - == {} - ) - - -async def test_significant_states_with_session_entity_minimal_response_no_matches( - hass: HomeAssistant, -) -> None: - """Test getting states at a specific point in time for entities that never have been recorded.""" - now = dt_util.utcnow() - time_before_recorder_ran = now - timedelta(days=1000) - with session_scope(hass=hass, read_only=True) as session: - assert ( - history.get_significant_states_with_session( - hass, - session, - time_before_recorder_ran, - now, - entity_ids=["demo.id"], - minimal_response=True, - ) - == {} - ) - assert ( - history.get_significant_states_with_session( - hass, - session, - time_before_recorder_ran, - now, - entity_ids=["demo.id", "demo.id2"], - minimal_response=True, - ) - == {} - ) - - -async def test_significant_states_with_session_single_entity( - hass: HomeAssistant, -) -> None: - """Test get_significant_states_with_session with a single entity.""" - hass.states.async_set("demo.id", "any", {"attr": True}) - hass.states.async_set("demo.id", "any2", {"attr": True}) - await async_wait_recording_done(hass) - now = dt_util.utcnow() - with session_scope(hass=hass, read_only=True) as session: - states = history.get_significant_states_with_session( - hass, - session, - now - timedelta(days=1), - now, - entity_ids=["demo.id"], - minimal_response=False, - ) - assert len(states["demo.id"]) == 2 - - -@pytest.mark.parametrize( - ("attributes", "no_attributes", "limit"), - [ - ({"attr": True}, False, 5000), - ({}, True, 5000), - ({"attr": True}, False, 3), - ({}, True, 3), - ], -) -async def test_state_changes_during_period( - hass: HomeAssistant, attributes, no_attributes, limit -) -> None: - """Test state change during period.""" - entity_id = "media_player.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state, attributes) - return hass.states.get(entity_id) - - start = dt_util.utcnow() - point = start + timedelta(seconds=1) - end = point + timedelta(seconds=1) - - with freeze_time(start) as freezer: - set_state("idle") - set_state("YouTube") - - freezer.move_to(point) - states = [ - set_state("idle"), - set_state("Netflix"), - set_state("Plex"), - set_state("YouTube"), - ] - - freezer.move_to(end) - set_state("Netflix") - set_state("Plex") - await async_wait_recording_done(hass) - - hist = history.state_changes_during_period( - hass, start, end, entity_id, no_attributes, limit=limit - ) - - assert_multiple_states_equal_without_context(states[:limit], hist[entity_id]) - - -async def test_state_changes_during_period_last_reported( - hass: HomeAssistant, -) -> None: - """Test state change during period.""" - entity_id = "media_player.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state) - return ha.State.from_dict(hass.states.get(entity_id).as_dict()) - - start = dt_util.utcnow() - point1 = start + timedelta(seconds=1) - point2 = point1 + timedelta(seconds=1) - end = point2 + timedelta(seconds=1) - - with freeze_time(start) as freezer: - set_state("idle") - - freezer.move_to(point1) - states = [set_state("YouTube")] - - freezer.move_to(point2) - set_state("YouTube") - - freezer.move_to(end) - set_state("Netflix") - await async_wait_recording_done(hass) - - hist = history.state_changes_during_period(hass, start, end, entity_id) - - assert_multiple_states_equal_without_context(states, hist[entity_id]) - - -async def test_state_changes_during_period_descending( - hass: HomeAssistant, -) -> None: - """Test state change during period descending.""" - entity_id = "media_player.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state, {"any": 1}) - return hass.states.get(entity_id) - - start = dt_util.utcnow().replace(microsecond=0) - point = start + timedelta(seconds=1) - point2 = start + timedelta(seconds=1, microseconds=100) - point3 = start + timedelta(seconds=1, microseconds=200) - point4 = start + timedelta(seconds=1, microseconds=300) - end = point + timedelta(seconds=1, microseconds=400) - - with freeze_time(start) as freezer: - set_state("idle") - set_state("YouTube") - - freezer.move_to(point) - states = [set_state("idle")] - - freezer.move_to(point2) - states.append(set_state("Netflix")) - - freezer.move_to(point3) - states.append(set_state("Plex")) - - freezer.move_to(point4) - states.append(set_state("YouTube")) - - freezer.move_to(end) - set_state("Netflix") - set_state("Plex") - await async_wait_recording_done(hass) - - hist = history.state_changes_during_period( - hass, start, end, entity_id, no_attributes=False, descending=False - ) - - assert_multiple_states_equal_without_context(states, hist[entity_id]) - - hist = history.state_changes_during_period( - hass, start, end, entity_id, no_attributes=False, descending=True - ) - assert_multiple_states_equal_without_context( - states, list(reversed(list(hist[entity_id]))) - ) - - start_time = point2 + timedelta(microseconds=10) - hist = history.state_changes_during_period( - hass, - start_time, # Pick a point where we will generate a start time state - end, - entity_id, - no_attributes=False, - descending=True, - include_start_time_state=True, - ) - hist_states = list(hist[entity_id]) - assert hist_states[-1].last_updated == start_time - assert hist_states[-1].last_changed == start_time - assert len(hist_states) == 3 - # Make sure they are in descending order - assert ( - hist_states[0].last_updated - > hist_states[1].last_updated - > hist_states[2].last_updated - ) - assert ( - hist_states[0].last_changed - > hist_states[1].last_changed - > hist_states[2].last_changed - ) - hist = history.state_changes_during_period( - hass, - start_time, # Pick a point where we will generate a start time state - end, - entity_id, - no_attributes=False, - descending=False, - include_start_time_state=True, - ) - hist_states = list(hist[entity_id]) - assert hist_states[0].last_updated == start_time - assert hist_states[0].last_changed == start_time - assert len(hist_states) == 3 - # Make sure they are in ascending order - assert ( - hist_states[0].last_updated - < hist_states[1].last_updated - < hist_states[2].last_updated - ) - assert ( - hist_states[0].last_changed - < hist_states[1].last_changed - < hist_states[2].last_changed - ) - - -async def test_get_last_state_changes(hass: HomeAssistant) -> None: - """Test number of state changes.""" - entity_id = "sensor.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state) - return hass.states.get(entity_id) - - start = dt_util.utcnow() - timedelta(minutes=2) - point = start + timedelta(minutes=1) - point2 = point + timedelta(minutes=1, seconds=1) - states = [] - - with freeze_time(start) as freezer: - set_state("1") - - freezer.move_to(point) - states.append(set_state("2")) - - freezer.move_to(point2) - states.append(set_state("3")) - await async_wait_recording_done(hass) - - hist = history.get_last_state_changes(hass, 2, entity_id) - - assert_multiple_states_equal_without_context(states, hist[entity_id]) - - -async def test_get_last_state_changes_last_reported( - hass: HomeAssistant, -) -> None: - """Test number of state changes.""" - entity_id = "sensor.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state) - return ha.State.from_dict(hass.states.get(entity_id).as_dict()) - - start = dt_util.utcnow() - timedelta(minutes=2) - point = start + timedelta(minutes=1) - point2 = point + timedelta(minutes=1, seconds=1) - states = [] - - with freeze_time(start) as freezer: - states.append(set_state("1")) - - freezer.move_to(point) - set_state("1") - - freezer.move_to(point2) - states.append(set_state("2")) - await async_wait_recording_done(hass) - - hist = history.get_last_state_changes(hass, 2, entity_id) - - assert_multiple_states_equal_without_context(states, hist[entity_id]) - - -async def test_get_last_state_change(hass: HomeAssistant) -> None: - """Test getting the last state change for an entity.""" - entity_id = "sensor.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state) - return hass.states.get(entity_id) - - start = dt_util.utcnow() - timedelta(minutes=2) - point = start + timedelta(minutes=1) - point2 = point + timedelta(minutes=1, seconds=1) - states = [] - - with freeze_time(start) as freezer: - set_state("1") - - freezer.move_to(point) - set_state("2") - - freezer.move_to(point2) - states.append(set_state("3")) - await async_wait_recording_done(hass) - - hist = history.get_last_state_changes(hass, 1, entity_id) - - assert_multiple_states_equal_without_context(states, hist[entity_id]) - - -async def test_ensure_state_can_be_copied( - hass: HomeAssistant, -) -> None: - """Ensure a state can pass though copy(). - - The filter integration uses copy() on states - from history. - """ - entity_id = "sensor.test" - - def set_state(state): - """Set the state.""" - hass.states.async_set(entity_id, state) - return hass.states.get(entity_id) - - start = dt_util.utcnow() - timedelta(minutes=2) - point = start + timedelta(minutes=1) - - with freeze_time(start) as freezer: - set_state("1") - - freezer.move_to(point) - set_state("2") - await async_wait_recording_done(hass) - - hist = history.get_last_state_changes(hass, 2, entity_id) - - assert_states_equal_without_context(copy(hist[entity_id][0]), hist[entity_id][0]) - assert_states_equal_without_context(copy(hist[entity_id][1]), hist[entity_id][1]) - - -async def test_get_significant_states(hass: HomeAssistant) -> None: - """Test that only significant states are returned. - - We should get back every thermostat change that - includes an attribute change, but only the state updates for - media player (attribute changes are not significant and not returned). - """ - zero, four, states = record_states(hass) - await async_wait_recording_done(hass) - - hist = history.get_significant_states(hass, zero, four, entity_ids=list(states)) - assert_dict_of_states_equal_without_context_and_last_changed(states, hist) - - -async def test_get_significant_states_minimal_response( - hass: HomeAssistant, -) -> None: - """Test that only significant states are returned. - - When minimal responses is set only the first and - last states return a complete state. - - We should get back every thermostat change that - includes an attribute change, but only the state updates for - media player (attribute changes are not significant and not returned). - """ - zero, four, states = record_states(hass) - await async_wait_recording_done(hass) - - hist = history.get_significant_states( - hass, zero, four, minimal_response=True, entity_ids=list(states) - ) - entites_with_reducable_states = [ - "media_player.test", - "media_player.test3", - ] - - # All states for media_player.test state are reduced - # down to last_changed and state when minimal_response - # is set except for the first state. - # is set. We use JSONEncoder to make sure that are - # pre-encoded last_changed is always the same as what - # will happen with encoding a native state - for entity_id in entites_with_reducable_states: - entity_states = states[entity_id] - for state_idx in range(1, len(entity_states)): - input_state = entity_states[state_idx] - orig_last_changed = json.dumps( - process_timestamp(input_state.last_changed), - cls=JSONEncoder, - ).replace('"', "") - orig_state = input_state.state - entity_states[state_idx] = { - "last_changed": orig_last_changed, - "state": orig_state, - } - - assert len(hist) == len(states) - assert_states_equal_without_context( - states["media_player.test"][0], hist["media_player.test"][0] - ) - assert states["media_player.test"][1] == hist["media_player.test"][1] - assert states["media_player.test"][2] == hist["media_player.test"][2] - - assert_multiple_states_equal_without_context( - states["media_player.test2"], hist["media_player.test2"] - ) - assert_states_equal_without_context( - states["media_player.test3"][0], hist["media_player.test3"][0] - ) - assert states["media_player.test3"][1] == hist["media_player.test3"][1] - - assert_multiple_states_equal_without_context( - states["script.can_cancel_this_one"], hist["script.can_cancel_this_one"] - ) - assert_multiple_states_equal_without_context_and_last_changed( - states["thermostat.test"], hist["thermostat.test"] - ) - assert_multiple_states_equal_without_context_and_last_changed( - states["thermostat.test2"], hist["thermostat.test2"] - ) - - -@pytest.mark.parametrize("time_zone", ["Europe/Berlin", "US/Hawaii", "UTC"]) -async def test_get_significant_states_with_initial( - time_zone, hass: HomeAssistant -) -> None: - """Test that only significant states are returned. - - We should get back every thermostat change that - includes an attribute change, but only the state updates for - media player (attribute changes are not significant and not returned). - """ - await hass.config.async_set_time_zone(time_zone) - zero, four, states = record_states(hass) - await async_wait_recording_done(hass) - - one_and_half = zero + timedelta(seconds=1.5) - for entity_id in states: - if entity_id == "media_player.test": - states[entity_id] = states[entity_id][1:] - for state in states[entity_id]: - # If the state is recorded before the start time - # start it will have its last_updated and last_changed - # set to the start time. - if state.last_updated < one_and_half: - state.last_updated = one_and_half - state.last_changed = one_and_half - - hist = history.get_significant_states( - hass, one_and_half, four, include_start_time_state=True, entity_ids=list(states) - ) - assert_dict_of_states_equal_without_context_and_last_changed(states, hist) - - -async def test_get_significant_states_without_initial( - hass: HomeAssistant, -) -> None: - """Test that only significant states are returned. - - We should get back every thermostat change that - includes an attribute change, but only the state updates for - media player (attribute changes are not significant and not returned). - """ - zero, four, states = record_states(hass) - await async_wait_recording_done(hass) - - one = zero + timedelta(seconds=1) - one_with_microsecond = zero + timedelta(seconds=1, microseconds=1) - one_and_half = zero + timedelta(seconds=1.5) - for entity_id in states: - states[entity_id] = [ - s - for s in states[entity_id] - if s.last_changed not in (one, one_with_microsecond) - ] - del states["media_player.test2"] - del states["thermostat.test3"] - - hist = history.get_significant_states( - hass, - one_and_half, - four, - include_start_time_state=False, - entity_ids=list(states), - ) - assert_dict_of_states_equal_without_context_and_last_changed(states, hist) - - -async def test_get_significant_states_entity_id( - hass: HomeAssistant, -) -> None: - """Test that only significant states are returned for one entity.""" - zero, four, states = record_states(hass) - await async_wait_recording_done(hass) - - del states["media_player.test2"] - del states["media_player.test3"] - del states["thermostat.test"] - del states["thermostat.test2"] - del states["thermostat.test3"] - del states["script.can_cancel_this_one"] - - hist = history.get_significant_states(hass, zero, four, ["media_player.test"]) - assert_dict_of_states_equal_without_context_and_last_changed(states, hist) - - -async def test_get_significant_states_multiple_entity_ids( - hass: HomeAssistant, -) -> None: - """Test that only significant states are returned for one entity.""" - zero, four, states = record_states(hass) - await async_wait_recording_done(hass) - - hist = history.get_significant_states( - hass, - zero, - four, - ["media_player.test", "thermostat.test"], - ) - - assert_multiple_states_equal_without_context_and_last_changed( - states["media_player.test"], hist["media_player.test"] - ) - assert_multiple_states_equal_without_context_and_last_changed( - states["thermostat.test"], hist["thermostat.test"] - ) - - -async def test_get_significant_states_are_ordered( - hass: HomeAssistant, -) -> None: - """Test order of results from get_significant_states. - - When entity ids are given, the results should be returned with the data - in the same order. - """ - zero, four, _states = record_states(hass) - await async_wait_recording_done(hass) - - entity_ids = ["media_player.test", "media_player.test2"] - hist = history.get_significant_states(hass, zero, four, entity_ids) - assert list(hist.keys()) == entity_ids - entity_ids = ["media_player.test2", "media_player.test"] - hist = history.get_significant_states(hass, zero, four, entity_ids) - assert list(hist.keys()) == entity_ids - - -async def test_get_significant_states_only( - hass: HomeAssistant, -) -> None: - """Test significant states when significant_states_only is set.""" - entity_id = "sensor.test" - - def set_state(state, **kwargs): - """Set the state.""" - hass.states.async_set(entity_id, state, **kwargs) - return hass.states.get(entity_id) - - start = dt_util.utcnow() - timedelta(minutes=4) - points = [start + timedelta(minutes=i) for i in range(1, 4)] - - states = [] - with freeze_time(start) as freezer: - set_state("123", attributes={"attribute": 10.64}) - - freezer.move_to(points[0]) - # Attributes are different, state not - states.append(set_state("123", attributes={"attribute": 21.42})) - - freezer.move_to(points[1]) - # state is different, attributes not - states.append(set_state("32", attributes={"attribute": 21.42})) - - freezer.move_to(points[2]) - # everything is different - states.append(set_state("412", attributes={"attribute": 54.23})) - await async_wait_recording_done(hass) - - hist = history.get_significant_states( - hass, - start, - significant_changes_only=True, - entity_ids=list({state.entity_id for state in states}), - ) - - assert len(hist[entity_id]) == 2 - assert not any( - state.last_updated == states[0].last_updated for state in hist[entity_id] - ) - assert any( - state.last_updated == states[1].last_updated for state in hist[entity_id] - ) - assert any( - state.last_updated == states[2].last_updated for state in hist[entity_id] - ) - - hist = history.get_significant_states( - hass, - start, - significant_changes_only=False, - entity_ids=list({state.entity_id for state in states}), - ) - - assert len(hist[entity_id]) == 3 - assert_multiple_states_equal_without_context_and_last_changed( - states, hist[entity_id] - ) - - -async def test_get_significant_states_only_minimal_response( - hass: HomeAssistant, -) -> None: - """Test significant states when significant_states_only is True.""" - now = dt_util.utcnow() - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "changed"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "again"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) - await async_wait_recording_done(hass) - - hist = history.get_significant_states( - hass, - now, - minimal_response=True, - significant_changes_only=False, - entity_ids=["sensor.test"], - ) - assert len(hist["sensor.test"]) == 3 - - -def record_states( - hass: HomeAssistant, -) -> tuple[datetime, datetime, dict[str, list[State]]]: - """Record some test states. - - We inject a bunch of state updates from media player, zone and - thermostat. - """ - mp = "media_player.test" - mp2 = "media_player.test2" - mp3 = "media_player.test3" - therm = "thermostat.test" - therm2 = "thermostat.test2" - therm3 = "thermostat.test3" - zone = "zone.home" - script_c = "script.can_cancel_this_one" - - def set_state(entity_id, state, **kwargs): - """Set the state.""" - hass.states.async_set(entity_id, state, **kwargs) - return hass.states.get(entity_id) - - zero = dt_util.utcnow() - one = zero + timedelta(seconds=1) - two = one + timedelta(seconds=1) - three = two + timedelta(seconds=1) - four = three + timedelta(seconds=1) - - states = {therm: [], therm2: [], therm3: [], mp: [], mp2: [], mp3: [], script_c: []} - with freeze_time(one) as freezer: - states[mp].append( - set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) - ) - states[mp2].append( - set_state(mp2, "YouTube", attributes={"media_title": str(sentinel.mt2)}) - ) - states[mp3].append( - set_state(mp3, "idle", attributes={"media_title": str(sentinel.mt1)}) - ) - states[therm].append( - set_state(therm, 20, attributes={"current_temperature": 19.5}) - ) - # This state will be updated - set_state(therm3, 20, attributes={"current_temperature": 19.5}) - - freezer.move_to(one + timedelta(microseconds=1)) - states[mp].append( - set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)}) - ) - - freezer.move_to(two) - # This state will be skipped only different in time - set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)}) - # This state will be skipped because domain is excluded - set_state(zone, "zoning") - states[script_c].append( - set_state(script_c, "off", attributes={"can_cancel": True}) - ) - states[therm].append( - set_state(therm, 21, attributes={"current_temperature": 19.8}) - ) - states[therm2].append( - set_state(therm2, 20, attributes={"current_temperature": 19}) - ) - # This state will be updated - set_state(therm3, 20, attributes={"current_temperature": 19.5}) - - freezer.move_to(three) - states[mp].append( - set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)}) - ) - states[mp3].append( - set_state(mp3, "Netflix", attributes={"media_title": str(sentinel.mt3)}) - ) - # Attributes changed even though state is the same - states[therm].append( - set_state(therm, 21, attributes={"current_temperature": 20}) - ) - states[therm3].append( - set_state(therm3, 20, attributes={"current_temperature": 19.5}) - ) - - return zero, four, states - - -async def test_get_full_significant_states_handles_empty_last_changed( - hass: HomeAssistant, -) -> None: - """Test getting states when last_changed is null.""" - now = dt_util.utcnow() - hass.states.async_set("sensor.one", "on", {"attr": "original"}) - state0 = hass.states.get("sensor.one") - await hass.async_block_till_done() - hass.states.async_set("sensor.one", "on", {"attr": "new"}) - state1 = hass.states.get("sensor.one") - - assert state0.last_changed == state1.last_changed - assert state0.last_updated != state1.last_updated - await async_wait_recording_done(hass) - - def _get_entries(): - with session_scope(hass=hass, read_only=True) as session: - return history.get_full_significant_states_with_session( - hass, - session, - now, - dt_util.utcnow(), - entity_ids=["sensor.one"], - significant_changes_only=False, - ) - - states = await recorder.get_instance(hass).async_add_executor_job(_get_entries) - sensor_one_states: list[State] = states["sensor.one"] - assert_states_equal_without_context(sensor_one_states[0], state0) - assert_states_equal_without_context(sensor_one_states[1], state1) - assert sensor_one_states[0].last_changed == sensor_one_states[1].last_changed - assert sensor_one_states[0].last_updated != sensor_one_states[1].last_updated - - def _fetch_native_states() -> list[State]: - with session_scope(hass=hass, read_only=True) as session: - native_states = [] - db_state_attributes = { - state_attributes.attributes_id: state_attributes - for state_attributes in session.query(StateAttributes) - } - metadata_id_to_entity_id = { - states_meta.metadata_id: states_meta - for states_meta in session.query(StatesMeta) - } - for db_state in session.query(States): - db_state.entity_id = metadata_id_to_entity_id[ - db_state.metadata_id - ].entity_id - state = db_state.to_native() - state.attributes = db_state_attributes[ - db_state.attributes_id - ].to_native() - native_states.append(state) - return native_states - - native_sensor_one_states = await recorder.get_instance(hass).async_add_executor_job( - _fetch_native_states - ) - assert_states_equal_without_context(native_sensor_one_states[0], state0) - assert_states_equal_without_context(native_sensor_one_states[1], state1) - assert ( - native_sensor_one_states[0].last_changed - == native_sensor_one_states[1].last_changed - ) - assert ( - native_sensor_one_states[0].last_updated - != native_sensor_one_states[1].last_updated - ) - - def _fetch_db_states() -> list[States]: - with session_scope(hass=hass, read_only=True) as session: - states = list(session.query(States)) - session.expunge_all() - return states - - db_sensor_one_states = await recorder.get_instance(hass).async_add_executor_job( - _fetch_db_states - ) - assert db_sensor_one_states[0].last_changed is None - assert db_sensor_one_states[0].last_changed_ts is None - - assert ( - process_timestamp( - dt_util.utc_from_timestamp(db_sensor_one_states[1].last_changed_ts) - ) - == state0.last_changed - ) - assert db_sensor_one_states[0].last_updated_ts is not None - assert db_sensor_one_states[1].last_updated_ts is not None - assert ( - db_sensor_one_states[0].last_updated_ts - != db_sensor_one_states[1].last_updated_ts - ) - - -async def test_state_changes_during_period_multiple_entities_single_test( - hass: HomeAssistant, -) -> None: - """Test state change during period with multiple entities in the same test. - - This test ensures the sqlalchemy query cache does not - generate incorrect results. - """ - start = dt_util.utcnow() - test_entites = {f"sensor.{i}": str(i) for i in range(30)} - for entity_id, value in test_entites.items(): - hass.states.async_set(entity_id, value) - - await async_wait_recording_done(hass) - end = dt_util.utcnow() - - for entity_id, value in test_entites.items(): - hist = history.state_changes_during_period(hass, start, end, entity_id) - assert len(hist) == 1 - assert hist[entity_id][0].state == value - - -@pytest.mark.freeze_time("2039-01-19 03:14:07.555555-00:00") -async def test_get_full_significant_states_past_year_2038( - hass: HomeAssistant, -) -> None: - """Test we can store times past year 2038.""" - past_2038_time = dt_util.parse_datetime("2039-01-19 03:14:07.555555-00:00") - hass.states.async_set("sensor.one", "on", {"attr": "original"}) - state0 = hass.states.get("sensor.one") - await hass.async_block_till_done() - - hass.states.async_set("sensor.one", "on", {"attr": "new"}) - state1 = hass.states.get("sensor.one") - - await async_wait_recording_done(hass) - - def _get_entries(): - with session_scope(hass=hass, read_only=True) as session: - return history.get_full_significant_states_with_session( - hass, - session, - past_2038_time - timedelta(days=365), - past_2038_time + timedelta(days=365), - entity_ids=["sensor.one"], - significant_changes_only=False, - ) - - states = await recorder.get_instance(hass).async_add_executor_job(_get_entries) - sensor_one_states: list[State] = states["sensor.one"] - assert_states_equal_without_context(sensor_one_states[0], state0) - assert_states_equal_without_context(sensor_one_states[1], state1) - assert sensor_one_states[0].last_changed == past_2038_time - assert sensor_one_states[0].last_updated == past_2038_time - - -async def test_get_significant_states_without_entity_ids_raises( - hass: HomeAssistant, -) -> None: - """Test at least one entity id is required for get_significant_states.""" - now = dt_util.utcnow() - with pytest.raises(ValueError, match="entity_ids must be provided"): - history.get_significant_states(hass, now, None) - - -async def test_state_changes_during_period_without_entity_ids_raises( - hass: HomeAssistant, -) -> None: - """Test at least one entity id is required for state_changes_during_period.""" - now = dt_util.utcnow() - with pytest.raises(ValueError, match="entity_id must be provided"): - history.state_changes_during_period(hass, now, None) - - -async def test_get_significant_states_with_filters_raises( - hass: HomeAssistant, -) -> None: - """Test passing filters is no longer supported.""" - now = dt_util.utcnow() - with pytest.raises(NotImplementedError, match="Filters are no longer supported"): - history.get_significant_states( - hass, now, None, ["media_player.test"], Filters() - ) - - -async def test_get_significant_states_with_non_existent_entity_ids_returns_empty( - hass: HomeAssistant, -) -> None: - """Test get_significant_states returns an empty dict when entities not in the db.""" - now = dt_util.utcnow() - assert history.get_significant_states(hass, now, None, ["nonexistent.entity"]) == {} - - -async def test_state_changes_during_period_with_non_existent_entity_ids_returns_empty( - hass: HomeAssistant, -) -> None: - """Test state_changes_during_period returns an empty dict when entities not in the db.""" - now = dt_util.utcnow() - assert ( - history.state_changes_during_period(hass, now, None, "nonexistent.entity") == {} - ) - - -async def test_get_last_state_changes_with_non_existent_entity_ids_returns_empty( - hass: HomeAssistant, -) -> None: - """Test get_last_state_changes returns an empty dict when entities not in the db.""" - assert history.get_last_state_changes(hass, 1, "nonexistent.entity") == {}