add basic static source support

This commit is contained in:
5ila5
2024-06-16 17:31:15 +02:00
parent 32c27c5e85
commit b85c362001
6 changed files with 217 additions and 37 deletions

View File

@@ -14,6 +14,7 @@ from homeassistant.const import CONF_NAME, CONF_VALUE_TEMPLATE
from homeassistant.core import callback
from homeassistant.helpers.selector import (
IconSelector,
ObjectSelector,
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
@@ -21,6 +22,7 @@ from homeassistant.helpers.selector import (
TemplateSelector,
TextSelector,
TextSelectorConfig,
TextSelectorType,
TimeSelector,
)
from homeassistant.helpers.translation import async_get_translations
@@ -68,6 +70,15 @@ SUPPORTED_ARG_TYPES = {
int: cv.positive_int,
bool: cv.boolean,
list: TextSelector(TextSelectorConfig(multiple=True)),
list[str]: TextSelector(TextSelectorConfig(multiple=True)),
list[str | int]: TextSelector(TextSelectorConfig(multiple=True)),
list[date | str]: TextSelector(
TextSelectorConfig(multiple=True, type=TextSelectorType.DATE)
),
date | str: TextSelector(TextSelectorConfig(type=TextSelectorType.DATE)),
date | str | None: TextSelector(TextSelectorConfig(type=TextSelectorType.DATE)),
dict: ObjectSelector(),
str | int: cv.string,
date: cv.date,
datetime: cv.datetime,
}
@@ -381,8 +392,14 @@ class WasteCollectionConfigFlow(ConfigFlow, domain=DOMAIN): # type: ignore[call
): str,
}
MODULE_FLOW_TYPES = (
module.CONFIG_FLOW_TYPES if hasattr(module, "CONFIG_FLOW_TYPES") else {}
)
for arg in args:
default = args[arg].default
field_type = None
annotation = args[arg].annotation
description = None
if args_input is not None and args[arg].name in args_input:
@@ -398,23 +415,53 @@ class WasteCollectionConfigFlow(ConfigFlow, domain=DOMAIN): # type: ignore[call
"suggested_value": pre_filled[args[arg].name],
}
if default == inspect.Signature.empty and annotation != inspect._empty:
if (
default == inspect.Signature.empty or default is None
) and annotation != inspect._empty:
if annotation in SUPPORTED_ARG_TYPES:
default = annotation()
field_type = SUPPORTED_ARG_TYPES[annotation]
elif (
isinstance(annotation, types.GenericAlias)
and annotation.__origin__ in SUPPORTED_ARG_TYPES
):
field_type = SUPPORTED_ARG_TYPES[annotation.__origin__]
elif isinstance(annotation, types.UnionType):
for a in annotation.__args__:
_LOGGER.debug(f"{args[arg].name} UnionType: {a}, {type(a)}")
if a in SUPPORTED_ARG_TYPES:
default = a()
_LOGGER.debug(
f"set default to {default} for {args[arg].name} from UnionType"
)
if a == str: # prefer str over other types
break
field_type = SUPPORTED_ARG_TYPES[a]
elif (
isinstance(a, types.GenericAlias)
and a.__origin__ in SUPPORTED_ARG_TYPES
):
field_type = SUPPORTED_ARG_TYPES[a.__origin__]
_LOGGER.debug(
f"Default for {args[arg].name}: {type(default) if default is not inspect.Signature.empty else inspect.Signature.empty}"
)
if args[arg].name in MODULE_FLOW_TYPES:
flow_type = MODULE_FLOW_TYPES[args[arg].name]
if flow_type.get("type") == "SELECT":
field_type = SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(label=x, value=x)
for x in flow_type.get("values")
],
translation_key="custom_flow_types",
mode=SelectSelectorMode.DROPDOWN,
multiple=flow_type.get("multiple", False),
)
)
if default == inspect.Signature.empty:
vol_args[vol.Required(args[arg].name, description=description)] = str
vol_args[vol.Required(args[arg].name, description=description)] = (
field_type or str
)
_LOGGER.debug(f"Required: {args[arg].name} as default type: str")
elif type(default) in SUPPORTED_ARG_TYPES or default is None:
elif field_type or default is None:
# Handle boolean, int, string, date, datetime, list defaults
vol_args[
vol.Optional(
@@ -423,7 +470,7 @@ class WasteCollectionConfigFlow(ConfigFlow, domain=DOMAIN): # type: ignore[call
description=description,
)
] = (
cv.string if default is None else SUPPORTED_ARG_TYPES[type(default)]
field_type or cv.string
)
else:
_LOGGER.debug(
@@ -522,7 +569,7 @@ class WasteCollectionConfigFlow(ConfigFlow, domain=DOMAIN): # type: ignore[call
)
if user_input is not None:
self._customize_types = list(set(user_input[CONF_TYPE]))
self._customize_types = list(set(user_input.get(CONF_TYPE, [])))
self._fetched_types = list({*self._fetched_types, *self._customize_types})
return await self.async_step_customize()
return self.async_show_form(step_id="customize_select", data_schema=schema)

View File

@@ -1652,6 +1652,18 @@
"default_params": {}
}
],
"Generic": [
{
"title": "ICS",
"module": "ics",
"default_params": {}
},
{
"title": "Static Source",
"module": "static",
"default_params": {}
}
],
"Germany": [
{
"title": "Abfall Stuttgart",
@@ -6065,12 +6077,5 @@
"module": "recyclecoach_com",
"default_params": {}
}
],
"Generic": [
{
"title": "ICS",
"module": "ics",
"default_params": {}
}
]
}

View File

@@ -107,6 +107,9 @@
}
},
"error": {
"invalid_weekday": "Ungültiger Wochentag. Bitte gib einen gültigen Wochentag an (MO, TU, WE, TH, FR, SA, SU).",
"invalid_count": "Ungültige Anzahl. Bitte gib eine gültige Nummer an.",
"invalid_weekdays": "Ungültiges Wochentagsformat bitte gib einen Wochentag (MO, TU, WE, TH, FR, SA, SU) oder ein Dictionary wie MO: 1.",
"invalid_source": "Du hast keine Quelle ausgewählt. Bitte wähle eine gültige Quelle aus.",
"fetch_error": "Die Quelle hat eine ungültige Antwort zurückgegeben: \"{fetch_error_message}\". Bitte überprüfe die Argumente und versuche es erneut.",
"fetch_empty": "Die Quelle hat eine leere Antwort zurückgegeben. Bitte überprüfe die Argumente und versuche es erneut.",
@@ -213,6 +216,14 @@
"hidden": "Versteckt"
}
},
"custom_flow_types": {
"options": {
"YEARLY": "Jährlich",
"MONTHLY": "Monatlich",
"WEEKLY": "Wöchentlich",
"DAILY": "Täglich"
}
},
"sensor_select": {
"options": {
"sensor_select_add_new": "Neuen Sensor hinzufügen"

View File

@@ -245,7 +245,17 @@
"sector": "Sector",
"phone": "Phone",
"strasse": "Strasse",
"year_field": "Year Field"
"year_field": "Year Field",
"weekdays": "Weekdays",
"dates": "Dates",
"type": "Type",
"until": "Until",
"excludes": "Excludes",
"frequency": "Frequency",
"headers": "Headers",
"start": "Start",
"count": "Count",
"interval": "Interval"
},
"data_description": {
"calendar_title": "A more readable, or user-friendly, name for the waste calendar. If nothing is provided, the name returned by the source will be used."
@@ -474,11 +484,24 @@
"sector": "Sector",
"phone": "Phone",
"strasse": "Strasse",
"year_field": "Year Field"
"year_field": "Year Field",
"weekdays": "Weekdays",
"dates": "Dates",
"type": "Type",
"until": "Until",
"excludes": "Excludes",
"frequency": "Frequency",
"headers": "Headers",
"start": "Start",
"count": "Count",
"interval": "Interval"
}
}
},
"error": {
"invalid_weekday": "Invalid weekday. Please provide a valid weekday (MO, TU, WE, TH, FR, SA, SU).",
"invalid_count": "Invalid count. Please provide a valid number.",
"invalid_weekdays": "Invalid Weekday format please proivde one (MO, TU, WE, TH, FR, SA, SU) or a dictonary like MO: 1.",
"invalid_source": "You did not select a source. Please select a valid source.",
"fetch_error": "The source returned an invalid response: \"{fetch_error_message}\". Please check the provided arguments and try again.",
"fetch_empty": "The source returned an empty response. Please check the provided arguments and try again.",
@@ -586,6 +609,14 @@
"hidden": "Hidden"
}
},
"custom_flow_types": {
"options": {
"YEARLY": "Yearly",
"MONTHLY": "Monthly",
"WEEKLY": "Weekly",
"DAILY": "Daily"
}
},
"sensor_select": {
"options": {
"sensor_select_add_new": "Add new sensor"

View File

@@ -1,4 +1,5 @@
import datetime
import logging
from collections import OrderedDict
from dateutil import parser
@@ -54,24 +55,79 @@ TEST_CASES = {
FREQNAMES = ["YEARLY", "MONTHLY", "WEEKLY", "DAILY"]
WEEKDAY_MAP = {"MO": MO, "TU": TU, "WE": WE, "TH": TH, "FR": FR, "SA": SA, "SU": SU}
_LOGGER = logging.getLogger(__name__)
def validate_params(user_input):
errors = {}
if not (weekdays := user_input.get("weekdays")):
return errors
if isinstance(weekdays, str):
if weekdays not in WEEKDAY_MAP:
errors["weekdays"] = "invalid_weekday"
return errors
if not isinstance(weekdays, dict):
for wday, count in weekdays.items():
if wday not in WEEKDAY_MAP:
errors["weekdays"] = "invalid_weekday"
break
if not isinstance(count, int):
errors["weekdays"] = "invalid_count"
break
return errors
errors["weekdays"] = "invalid_weekdays"
return errors
CONFIG_FLOW_TYPES = {
"frequency": {"type": "SELECT", "values": FREQNAMES, "multiple": False}
}
def check_dates(dates):
if not isinstance(dates, list):
return False
for date in dates:
try:
parser.isoparse(date)
except ValueError:
return False
return True
def check_date(date):
try:
parser.isoparse(date)
except ValueError:
return False
return True
def get_tyep(params):
return type(params)
class Source:
def __init__(
self,
type: str,
dates: list[str] | None = None,
dates: list[datetime.date | str] | None = None,
frequency: str | None = None,
interval: int = 1,
start: str | None = None,
until: str | None = None,
start: datetime.date | str | None = None,
until: datetime.date | str | None = None,
count: int | None = None,
excludes: list[str] | None = None,
weekdays: list[str | int]
| dict[str | int, int | str | None]
| str
| None = None,
excludes: list[datetime.date | str] | None = None,
weekdays: str | dict[str | int, int | str | None] | None = None,
):
for d in dates or []:
_LOGGER.debug(f"date: {d}")
_LOGGER.debug(f"date type: {get_tyep(d)}")
self._weekdays: list[weekday] | None = None
if weekdays is not None:
self._weekdays = []
@@ -94,18 +150,34 @@ class Source:
self._weekdays = None
self._type = type
self._dates = [parser.isoparse(d).date() for d in dates or []]
self._dates = [
d if isinstance(d, datetime.date) else parser.isoparse(d).date()
for d in dates or []
]
self._recurrence = FREQNAMES.index(frequency) if frequency is not None else None
self._interval = interval
self._start = parser.isoparse(start).date() if start else None
self._start = (
start
if isinstance(start, datetime.date)
else parser.isoparse(start).date()
if start
else None
)
if until:
self._until: datetime.date | None = parser.isoparse(until).date()
self._until: datetime.date | None = (
until
if isinstance(until, datetime.date)
else parser.isoparse(until).date()
)
self._count = None
else:
self._until = None
self._count = count if count else 10
self._excludes = [parser.isoparse(d).date() for d in excludes or []]
self._excludes = [
d if isinstance(d, datetime.date) else parser.isoparse(d).date()
for d in excludes or []
]
def add_weekday(self, weekday, count: int):
if self._weekdays is None:

View File

@@ -15,9 +15,14 @@ import yaml
SECRET_FILENAME = "secrets.yaml"
SECRET_REGEX = re.compile(r"!secret\s(\w+)")
BLACK_LIST = {
GENERICS = {
"/doc/source/ics.md",
"/doc/source/static.md",
}
BLACK_LIST = {
*GENERICS,
"/doc/source/multiple.md",
"/doc/source/example.md",
}
@@ -131,9 +136,12 @@ def main() -> None:
# sort into countries
country_code_map = make_country_code_map()
countries: dict[str, list[SourceInfo]] = {}
generics: list[SourceInfo] = []
orphans: list[SourceInfo] = []
for s in sources:
if s.filename in GENERICS:
generics.append(s)
if s.filename in BLACK_LIST:
continue # skip
@@ -149,7 +157,7 @@ def main() -> None:
for o in orphans:
print(o)
update_json(countries)
update_json(countries, generics=generics)
update_readme_md(countries)
update_info_md(countries)
@@ -339,6 +347,8 @@ def multiline_indent(s, numspaces):
def beautify_url(url):
if url is None:
return ""
url = url.removesuffix("/")
url = url.removeprefix("http://")
url = url.removeprefix("https://")
@@ -346,9 +356,13 @@ def beautify_url(url):
return url
def update_json(countries: dict[str, list[SourceInfo]]):
def update_json(
countries: dict[str, list[SourceInfo]], generics: list[SourceInfo] = []
):
params = set()
param_translations: dict[str, dict[str, str]] = {}
countries = countries.copy()
countries["Generic"] = generics
# generate country list
output: dict[str, list[dict[str, str | dict[str, Any]]]] = {}
for country in sorted(countries):
@@ -376,7 +390,7 @@ def update_json(countries: dict[str, list[SourceInfo]]):
else:
param_translations[key] = value.copy()
output["Generic"] = [{"title": "ICS", "module": "ics", "default_params": {}}]
# output["Generic"] = [{"title": "ICS", "module": "ics", "default_params": {}}, {"title": "Static", "module": "static", "default_params": {}}]
for lang in LANGUAGES:
tranlation_file = (