move wcs_coordinator/waste_collection_api in own file + readd fetch service for UI config

This commit is contained in:
5ila5
2024-08-16 16:57:11 +02:00
committed by 5ila5
parent 89978a9b94
commit 54dd924e44
5 changed files with 269 additions and 233 deletions

View File

@@ -1,27 +1,17 @@
"""Config flow setup logic."""
import logging
import site
from pathlib import Path
from random import randrange
from typing import Any
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import (
async_call_later,
async_track_time_change,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.core import HomeAssistant
package_dir = Path(__file__).resolve().parents[0]
site.addsitedir(str(package_dir))
from .service import get_fetch_all_service
from .wcs_coordinator import WCSCoordinator
from . import const # type: ignore # isort:skip # noqa: E402
from waste_collection_schedule import CollectionAggregator, SourceShell, Customize # type: ignore # isort:skip # noqa: E402
from .waste_collection_schedule import SourceShell, Customize # type: ignore # isort:skip # noqa: E402
_LOGGER = logging.getLogger(__name__)
@@ -84,6 +74,11 @@ async def async_setup_entry(hass: HomeAssistant, entry) -> bool:
entry.async_on_unload(entry.add_update_listener(async_update_listener))
# Register new Service fetch_data
hass.services.async_register(
const.DOMAIN, "fetch_data", get_fetch_all_service(hass), schema=vol.Schema({})
)
return True
@@ -126,103 +121,3 @@ async def async_migrate_entry(hass, config_entry: ConfigEntry) -> bool:
_LOGGER.debug("Migration to version %s successful", config_entry.version)
return True
class WCSCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching data from waste collection service provider."""
_shell: SourceShell
_aggregator: CollectionAggregator
def __init__(
self,
hass: HomeAssistant,
source_shell: SourceShell,
separator,
fetch_time,
random_fetch_time_offset,
day_switch_time,
):
self._hass = hass
self._shell = source_shell
self._aggregator = CollectionAggregator([source_shell])
self._separator = separator
self._fetch_time = dt_util.parse_time(fetch_time)
self._random_fetch_time_offset = random_fetch_time_offset
self._day_switch_time = dt_util.parse_time(day_switch_time)
super().__init__(hass, _LOGGER, name=const.DOMAIN)
# start timer to fetch date once per day
self._fetch_tracker = async_track_time_change(
hass,
self._fetch_callback,
self._fetch_time.hour,
self._fetch_time.minute,
self._fetch_time.second,
)
# start timer for day-switch time
if self._day_switch_time != self._fetch_time:
async_track_time_change( # TODO: cancel on unload
hass,
self._update_sensors_callback,
self._day_switch_time.hour,
self._day_switch_time.minute,
self._day_switch_time.second,
)
# add a timer at midnight (if not already there) to update days-to
midnight = dt_util.parse_time("00:00")
if midnight != self._fetch_time and midnight != self._day_switch_time:
async_track_time_change( # TODO: cancel on unload
hass,
self._update_sensors_callback,
midnight.hour,
midnight.minute,
midnight.second,
)
async def _async_update_data(self) -> None:
"""Update data via library."""
await self._fetch_now()
@property
def shell(self):
return self._shell
@property
def separator(self):
return self._separator
@property
def day_switch_time(self):
return self._day_switch_time
@property
def device_info(self):
return DeviceInfo(
identifiers={(const.DOMAIN, f"{self.shell.unique_id}")},
name="Waste Collection Schedule",
manufacturer=self.shell.title,
model="Waste Collection Schedule",
entry_type=DeviceEntryType.SERVICE,
)
@callback
async def _fetch_callback(self, *_):
async_call_later(
self._hass,
randrange(0, 60 * self._random_fetch_time_offset),
self._fetch_now,
)
@callback
async def _update_sensors_callback(self, *_):
dispatcher_send(self._hass, const.UPDATE_SENSORS_SIGNAL)
async def _fetch_now(self, *_):
if self.shell:
await self._hass.async_add_executor_job(self.shell.fetch)
await self._update_sensors_callback()

View File

@@ -3,24 +3,20 @@
import logging
import site
from pathlib import Path
from random import randrange
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.core import HomeAssistant
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import async_call_later # isort:skip
from homeassistant.helpers.event import async_track_time_change # isort:skip
from .service import get_fetch_all_service
from .waste_collection_api import WasteCollectionApi
# add module directory to path
package_dir = Path(__file__).resolve().parents[0]
site.addsitedir(str(package_dir))
from . import const # type: ignore # isort:skip # noqa: E402
from waste_collection_schedule import Customize, SourceShell # type: ignore # isort:skip # noqa: E402
from waste_collection_schedule import Customize # type: ignore # isort:skip # noqa: E402
_LOGGER = logging.getLogger(__name__)
@@ -127,118 +123,9 @@ async def async_setup(hass: HomeAssistant, config: dict):
# initial fetch of all data
hass.add_job(api._fetch)
async def async_fetch_data(service: ServiceCall) -> None:
hass.add_job(api._fetch)
# Register new Service fetch_data
hass.services.async_register(
const.DOMAIN, "fetch_data", async_fetch_data, schema=vol.Schema({})
const.DOMAIN, "fetch_data", get_fetch_all_service(hass), schema=vol.Schema({})
)
return True
class WasteCollectionApi:
def __init__(
self, hass, separator, fetch_time, random_fetch_time_offset, day_switch_time
):
self._hass = hass
self._source_shells = []
self._separator = separator
self._fetch_time = fetch_time
self._random_fetch_time_offset = random_fetch_time_offset
self._day_switch_time = day_switch_time
# start timer to fetch date once per day
async_track_time_change(
hass,
self._fetch_callback,
self._fetch_time.hour,
self._fetch_time.minute,
self._fetch_time.second,
)
# start timer for day-switch time
if self._day_switch_time != self._fetch_time:
async_track_time_change(
hass,
self._update_sensors_callback,
self._day_switch_time.hour,
self._day_switch_time.minute,
self._day_switch_time.second,
)
# add a timer at midnight (if not already there) to update days-to
midnight = dt_util.parse_time("00:00")
if midnight != self._fetch_time and midnight != self._day_switch_time:
async_track_time_change(
hass,
self._update_sensors_callback,
midnight.hour,
midnight.minute,
midnight.second,
)
@property
def separator(self):
"""Separator string, used to separator waste types."""
return self._separator
@property
def fetch_time(self):
"""When to fetch to data."""
return self._fetch_time
@property
def day_switch_time(self):
"""When to hide entries for today."""
return self._day_switch_time
def add_source_shell(
self,
source_name,
customize,
source_args,
calendar_title,
day_offset,
):
new_shell = SourceShell.create(
source_name=source_name,
customize=customize,
source_args=source_args,
calendar_title=calendar_title,
day_offset=day_offset,
)
if new_shell:
self._source_shells.append(new_shell)
return new_shell
def _fetch(self, *_):
for shell in self._source_shells:
shell.fetch()
self._update_sensors_callback()
@property
def shells(self):
return self._source_shells
def get_shell(self, index):
return self._source_shells[index] if index < len(self._source_shells) else None
@callback
def _fetch_callback(self, *_):
async_call_later(
self._hass,
randrange(0, 60 * self._random_fetch_time_offset),
self._fetch_now_callback,
)
@callback
def _fetch_now_callback(self, *_):
self._hass.add_job(self._fetch)
@callback
def _update_sensors_callback(self, *_):
dispatcher_send(self._hass, const.UPDATE_SENSORS_SIGNAL)

View File

@@ -0,0 +1,16 @@
from homeassistant.core import HomeAssistant, ServiceCall
from . import const
from .waste_collection_api import WasteCollectionApi
from .wcs_coordinator import WCSCoordinator
def get_fetch_all_service(hass: HomeAssistant):
async def async_fetch_data(service: ServiceCall) -> None:
for entry_id, coordinator in hass.data[const.DOMAIN].items():
if isinstance(coordinator, WCSCoordinator):
hass.add_job(coordinator._fetch_now)
elif isinstance(coordinator, WasteCollectionApi):
hass.add_job(coordinator._fetch)
return async_fetch_data

View File

@@ -0,0 +1,120 @@
# This is the class organizing the different sources when using the yaml configuration
from random import randrange
import homeassistant.util.dt as dt_util
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import async_track_time_change
from . import const
from .waste_collection_schedule import SourceShell
from homeassistant.helpers.event import async_call_later # isort:skip
class WasteCollectionApi:
"""Class to manage the waste collection sources when using the yaml configuration."""
def __init__(
self, hass, separator, fetch_time, random_fetch_time_offset, day_switch_time
):
self._hass = hass
self._source_shells = []
self._separator = separator
self._fetch_time = fetch_time
self._random_fetch_time_offset = random_fetch_time_offset
self._day_switch_time = day_switch_time
# start timer to fetch date once per day
async_track_time_change(
hass,
self._fetch_callback,
self._fetch_time.hour,
self._fetch_time.minute,
self._fetch_time.second,
)
# start timer for day-switch time
if self._day_switch_time != self._fetch_time:
async_track_time_change(
hass,
self._update_sensors_callback,
self._day_switch_time.hour,
self._day_switch_time.minute,
self._day_switch_time.second,
)
# add a timer at midnight (if not already there) to update days-to
midnight = dt_util.parse_time("00:00")
if midnight != self._fetch_time and midnight != self._day_switch_time:
async_track_time_change(
hass,
self._update_sensors_callback,
midnight.hour,
midnight.minute,
midnight.second,
)
@property
def separator(self):
"""Separator string, used to separator waste types."""
return self._separator
@property
def fetch_time(self):
"""When to fetch to data."""
return self._fetch_time
@property
def day_switch_time(self):
"""When to hide entries for today."""
return self._day_switch_time
def add_source_shell(
self,
source_name,
customize,
source_args,
calendar_title,
day_offset,
):
new_shell = SourceShell.create(
source_name=source_name,
customize=customize,
source_args=source_args,
calendar_title=calendar_title,
day_offset=day_offset,
)
if new_shell:
self._source_shells.append(new_shell)
return new_shell
def _fetch(self, *_):
for shell in self._source_shells:
shell.fetch()
self._update_sensors_callback()
@property
def shells(self):
return self._source_shells
def get_shell(self, index):
return self._source_shells[index] if index < len(self._source_shells) else None
@callback
def _fetch_callback(self, *_):
async_call_later(
self._hass,
randrange(0, 60 * self._random_fetch_time_offset),
self._fetch_now_callback,
)
@callback
def _fetch_now_callback(self, *_):
self._hass.add_job(self._fetch)
@callback
def _update_sensors_callback(self, *_):
dispatcher_send(self._hass, const.UPDATE_SENSORS_SIGNAL)

View File

@@ -0,0 +1,118 @@
import logging
from random import randrange
from typing import Any
import homeassistant.util.dt as dt_util
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import (
async_call_later,
async_track_time_change,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from . import const
from .waste_collection_schedule import CollectionAggregator, SourceShell
_LOGGER = logging.getLogger(__name__)
class WCSCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching data from waste collection service provider."""
_shell: SourceShell
_aggregator: CollectionAggregator
def __init__(
self,
hass: HomeAssistant,
source_shell: SourceShell,
separator,
fetch_time,
random_fetch_time_offset,
day_switch_time,
):
self._hass = hass
self._shell = source_shell
self._aggregator = CollectionAggregator([source_shell])
self._separator = separator
self._fetch_time = dt_util.parse_time(fetch_time)
self._random_fetch_time_offset = random_fetch_time_offset
self._day_switch_time = dt_util.parse_time(day_switch_time)
super().__init__(hass, _LOGGER, name=const.DOMAIN)
# start timer to fetch date once per day
self._fetch_tracker = async_track_time_change(
hass,
self._fetch_callback,
self._fetch_time.hour,
self._fetch_time.minute,
self._fetch_time.second,
)
# start timer for day-switch time
if self._day_switch_time != self._fetch_time:
async_track_time_change( # TODO: cancel on unload
hass,
self._update_sensors_callback,
self._day_switch_time.hour,
self._day_switch_time.minute,
self._day_switch_time.second,
)
# add a timer at midnight (if not already there) to update days-to
midnight = dt_util.parse_time("00:00")
if midnight != self._fetch_time and midnight != self._day_switch_time:
async_track_time_change( # TODO: cancel on unload
hass,
self._update_sensors_callback,
midnight.hour,
midnight.minute,
midnight.second,
)
async def _async_update_data(self) -> None:
"""Update data via library."""
await self._fetch_now()
@property
def shell(self):
return self._shell
@property
def separator(self):
return self._separator
@property
def day_switch_time(self):
return self._day_switch_time
@property
def device_info(self):
return DeviceInfo(
identifiers={(const.DOMAIN, f"{self.shell.unique_id}")},
name="Waste Collection Schedule",
manufacturer=self.shell.title,
model="Waste Collection Schedule",
entry_type=DeviceEntryType.SERVICE,
)
@callback
async def _fetch_callback(self, *_):
async_call_later(
self._hass,
randrange(0, 60 * self._random_fetch_time_offset),
self._fetch_now,
)
@callback
async def _update_sensors_callback(self, *_):
dispatcher_send(self._hass, const.UPDATE_SENSORS_SIGNAL)
async def _fetch_now(self, *_):
if self.shell:
await self._hass.async_add_executor_job(self.shell.fetch)
await self._update_sensors_callback()