From f49299b0098135ddb996bdf317d4b1e681df1ffb Mon Sep 17 00:00:00 2001 From: hanwg Date: Fri, 10 Oct 2025 21:50:54 +0800 Subject: [PATCH] Add edit message media feature for Telegram bot (#151034) --- .../components/telegram_bot/__init__.py | 35 +++++++++ homeassistant/components/telegram_bot/bot.py | 68 ++++++++++++++++- .../components/telegram_bot/const.py | 3 + .../components/telegram_bot/icons.json | 3 + .../components/telegram_bot/services.yaml | 71 ++++++++++++++++++ .../components/telegram_bot/strings.json | 67 +++++++++++++++++ .../telegram_bot/test_telegram_bot.py | 75 ++++++++++++++++++- 7 files changed, 319 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/telegram_bot/__init__.py b/homeassistant/components/telegram_bot/__init__.py index 91bbc088744..be91d7b0daf 100644 --- a/homeassistant/components/telegram_bot/__init__.py +++ b/homeassistant/components/telegram_bot/__init__.py @@ -8,6 +8,7 @@ from types import ModuleType from typing import Any from telegram import Bot +from telegram.constants import InputMediaType from telegram.error import InvalidToken, TelegramError import voluptuous as vol @@ -52,6 +53,7 @@ from .const import ( ATTR_IS_BIG, ATTR_KEYBOARD, ATTR_KEYBOARD_INLINE, + ATTR_MEDIA_TYPE, ATTR_MESSAGE, ATTR_MESSAGE_TAG, ATTR_MESSAGE_THREAD_ID, @@ -98,6 +100,7 @@ from .const import ( SERVICE_DELETE_MESSAGE, SERVICE_EDIT_CAPTION, SERVICE_EDIT_MESSAGE, + SERVICE_EDIT_MESSAGE_MEDIA, SERVICE_EDIT_REPLYMARKUP, SERVICE_LEAVE_CHAT, SERVICE_SEND_ANIMATION, @@ -233,6 +236,35 @@ SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend( } ) +SERVICE_SCHEMA_EDIT_MESSAGE_MEDIA = vol.Schema( + { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, + vol.Required(ATTR_MESSAGEID): vol.Any( + cv.positive_int, vol.All(cv.string, "last") + ), + vol.Required(ATTR_CHAT_ID): vol.Coerce(int), + vol.Optional(ATTR_TIMEOUT): cv.positive_int, + vol.Optional(ATTR_CAPTION): cv.string, + vol.Required(ATTR_MEDIA_TYPE): vol.In( + ( + str(InputMediaType.ANIMATION), + str(InputMediaType.AUDIO), + str(InputMediaType.VIDEO), + str(InputMediaType.DOCUMENT), + str(InputMediaType.PHOTO), + ) + ), + vol.Optional(ATTR_URL): cv.string, + vol.Optional(ATTR_FILE): cv.string, + vol.Optional(ATTR_USERNAME): cv.string, + vol.Optional(ATTR_PASSWORD): cv.string, + vol.Optional(ATTR_AUTHENTICATION): cv.string, + vol.Optional(ATTR_VERIFY_SSL): cv.boolean, + vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list, + }, + extra=vol.ALLOW_EXTRA, +) + SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema( { vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, @@ -311,6 +343,7 @@ SERVICE_MAP = { SERVICE_SEND_LOCATION: SERVICE_SCHEMA_SEND_LOCATION, SERVICE_SEND_POLL: SERVICE_SCHEMA_SEND_POLL, SERVICE_EDIT_MESSAGE: SERVICE_SCHEMA_EDIT_MESSAGE, + SERVICE_EDIT_MESSAGE_MEDIA: SERVICE_SCHEMA_EDIT_MESSAGE_MEDIA, SERVICE_EDIT_CAPTION: SERVICE_SCHEMA_EDIT_CAPTION, SERVICE_EDIT_REPLYMARKUP: SERVICE_SCHEMA_EDIT_REPLYMARKUP, SERVICE_ANSWER_CALLBACK_QUERY: SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY, @@ -435,6 +468,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: await notify_service.leave_chat(context=service.context, **kwargs) elif msgtype == SERVICE_SET_MESSAGE_REACTION: await notify_service.set_message_reaction(context=service.context, **kwargs) + elif msgtype == SERVICE_EDIT_MESSAGE_MEDIA: + await notify_service.edit_message_media(context=service.context, **kwargs) else: await notify_service.edit_message( msgtype, context=service.context, **kwargs diff --git a/homeassistant/components/telegram_bot/bot.py b/homeassistant/components/telegram_bot/bot.py index 42bd493489b..f5fbfafa02b 100644 --- a/homeassistant/components/telegram_bot/bot.py +++ b/homeassistant/components/telegram_bot/bot.py @@ -15,6 +15,12 @@ from telegram import ( CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, + InputMedia, + InputMediaAnimation, + InputMediaAudio, + InputMediaDocument, + InputMediaPhoto, + InputMediaVideo, InputPollOption, Message, ReplyKeyboardMarkup, @@ -22,7 +28,7 @@ from telegram import ( Update, User, ) -from telegram.constants import ParseMode +from telegram.constants import InputMediaType, ParseMode from telegram.error import TelegramError from telegram.ext import CallbackContext, filters from telegram.request import HTTPXRequest @@ -52,6 +58,7 @@ from .const import ( ATTR_FILE, ATTR_FROM_FIRST, ATTR_FROM_LAST, + ATTR_INLINE_MESSAGE_ID, ATTR_KEYBOARD, ATTR_KEYBOARD_INLINE, ATTR_MESSAGE, @@ -299,7 +306,7 @@ class TelegramNotificationService: ): message_id = self._last_message_id[chat_id] else: - inline_message_id = msg_data["inline_message_id"] + inline_message_id = msg_data[ATTR_INLINE_MESSAGE_ID] return message_id, inline_message_id def get_target_chat_ids(self, target: int | list[int] | None) -> list[int]: @@ -527,6 +534,63 @@ class TelegramNotificationService: self._last_message_id[chat_id] -= 1 return deleted + async def edit_message_media( + self, + media_type: str, + chat_id: int | None = None, + context: Context | None = None, + **kwargs: Any, + ) -> Any: + "Edit message media of a previously sent message." + chat_id = self.get_target_chat_ids(chat_id)[0] + message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id) + params = self._get_msg_kwargs(kwargs) + _LOGGER.debug( + "Edit message media %s in chat ID %s with params: %s", + message_id or inline_message_id, + chat_id, + params, + ) + + file_content = await load_data( + self.hass, + url=kwargs.get(ATTR_URL), + filepath=kwargs.get(ATTR_FILE), + username=kwargs.get(ATTR_USERNAME, ""), + password=kwargs.get(ATTR_PASSWORD, ""), + authentication=kwargs.get(ATTR_AUTHENTICATION), + verify_ssl=( + get_default_context() + if kwargs.get(ATTR_VERIFY_SSL, False) + else get_default_no_verify_context() + ), + ) + + media: InputMedia + if media_type == InputMediaType.ANIMATION: + media = InputMediaAnimation(file_content, caption=kwargs.get(ATTR_CAPTION)) + elif media_type == InputMediaType.AUDIO: + media = InputMediaAudio(file_content, caption=kwargs.get(ATTR_CAPTION)) + elif media_type == InputMediaType.DOCUMENT: + media = InputMediaDocument(file_content, caption=kwargs.get(ATTR_CAPTION)) + elif media_type == InputMediaType.PHOTO: + media = InputMediaPhoto(file_content, caption=kwargs.get(ATTR_CAPTION)) + else: + media = InputMediaVideo(file_content, caption=kwargs.get(ATTR_CAPTION)) + + return await self._send_msg( + self.bot.edit_message_media, + "Error editing message media", + params[ATTR_MESSAGE_TAG], + media=media, + chat_id=chat_id, + message_id=message_id, + inline_message_id=inline_message_id, + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + context=context, + ) + async def edit_message( self, type_edit: str, diff --git a/homeassistant/components/telegram_bot/const.py b/homeassistant/components/telegram_bot/const.py index 34b8a476c78..e891e1fa639 100644 --- a/homeassistant/components/telegram_bot/const.py +++ b/homeassistant/components/telegram_bot/const.py @@ -44,6 +44,7 @@ SERVICE_SEND_LOCATION = "send_location" SERVICE_SEND_POLL = "send_poll" SERVICE_SET_MESSAGE_REACTION = "set_message_reaction" SERVICE_EDIT_MESSAGE = "edit_message" +SERVICE_EDIT_MESSAGE_MEDIA = "edit_message_media" SERVICE_EDIT_CAPTION = "edit_caption" SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup" SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query" @@ -96,6 +97,8 @@ ATTR_RESIZE_KEYBOARD = "resize_keyboard" ATTR_ONE_TIME_KEYBOARD = "one_time_keyboard" ATTR_KEYBOARD_INLINE = "inline_keyboard" ATTR_MESSAGEID = "message_id" +ATTR_INLINE_MESSAGE_ID = "inline_message_id" +ATTR_MEDIA_TYPE = "media_type" ATTR_MSG = "message" ATTR_MSGID = "id" ATTR_PARSER = "parse_mode" diff --git a/homeassistant/components/telegram_bot/icons.json b/homeassistant/components/telegram_bot/icons.json index 3208fdfbc3e..0df25f97944 100644 --- a/homeassistant/components/telegram_bot/icons.json +++ b/homeassistant/components/telegram_bot/icons.json @@ -33,6 +33,9 @@ "edit_message": { "service": "mdi:pencil" }, + "edit_message_media": { + "service": "mdi:pencil" + }, "edit_caption": { "service": "mdi:pencil" }, diff --git a/homeassistant/components/telegram_bot/services.yaml b/homeassistant/components/telegram_bot/services.yaml index e0e03921a93..b38bd23bb1d 100644 --- a/homeassistant/components/telegram_bot/services.yaml +++ b/homeassistant/components/telegram_bot/services.yaml @@ -746,6 +746,77 @@ edit_message: selector: object: +edit_message_media: + fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot + message_id: + required: true + example: "{{ trigger.event.data.message.message_id }}" + selector: + text: + chat_id: + required: true + example: 12345 + selector: + text: + timeout: + selector: + number: + min: 1 + max: 3600 + unit_of_measurement: seconds + media_type: + selector: + select: + options: + - "animation" + - "audio" + - "document" + - "photo" + - "video" + translation_key: "media_type" + url: + example: "http://example.org/path/to/the/image.png" + selector: + text: + file: + example: "/path/to/the/image.png" + selector: + text: + caption: + example: Document Title xy + selector: + text: + authentication: + selector: + select: + options: + - "basic" + - "digest" + - "bearer_token" + translation_key: "authentication" + username: + example: myuser + selector: + text: + password: + example: myuser_pwd + selector: + text: + type: password + verify_ssl: + selector: + boolean: + inline_keyboard: + example: + '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], + ["Text button2", "/button2"]], [["Text button3", "/button3"]]]' + selector: + object: + edit_caption: fields: config_entry_id: diff --git a/homeassistant/components/telegram_bot/strings.json b/homeassistant/components/telegram_bot/strings.json index 759b22a3368..1a4d8c0b30a 100644 --- a/homeassistant/components/telegram_bot/strings.json +++ b/homeassistant/components/telegram_bot/strings.json @@ -153,6 +153,15 @@ "record_video_note": "Recording video note", "upload_video_note": "Uploading video note" } + }, + "media_type": { + "options": { + "animation": "Animation", + "audio": "Audio", + "document": "Document", + "photo": "Photo", + "video": "Video" + } } }, "services": { @@ -814,6 +823,64 @@ } } }, + "edit_message_media": { + "name": "Edit message media", + "description": "Edits the media content of a previously sent message.", + "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to edit the message media." + }, + "message_id": { + "name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]", + "description": "[%key:component::telegram_bot::services::edit_message::fields::message_id::description%]" + }, + "chat_id": { + "name": "[%key:component::telegram_bot::services::edit_message::fields::chat_id::name%]", + "description": "[%key:component::telegram_bot::services::edit_message::fields::chat_id::description%]" + }, + "media_type": { + "name": "Media type", + "description": "Type for the new media." + }, + "url": { + "name": "[%key:common::config_flow::data::url%]", + "description": "Remote path to the media." + }, + "file": { + "name": "[%key:component::telegram_bot::services::send_photo::fields::file::name%]", + "description": "Local path to the media." + }, + "caption": { + "name": "[%key:component::telegram_bot::services::send_photo::fields::caption::name%]", + "description": "The title of the media." + }, + "username": { + "name": "[%key:common::config_flow::data::username%]", + "description": "[%key:component::telegram_bot::services::send_photo::fields::username::description%]" + }, + "password": { + "name": "[%key:common::config_flow::data::password%]", + "description": "[%key:component::telegram_bot::services::send_photo::fields::password::description%]" + }, + "authentication": { + "name": "[%key:component::telegram_bot::services::send_photo::fields::authentication::name%]", + "description": "[%key:component::telegram_bot::services::send_photo::fields::authentication::description%]" + }, + "verify_ssl": { + "name": "[%key:component::telegram_bot::services::send_photo::fields::verify_ssl::name%]", + "description": "[%key:component::telegram_bot::services::send_photo::fields::verify_ssl::description%]" + }, + "timeout": { + "name": "[%key:component::telegram_bot::services::send_photo::fields::timeout::name%]", + "description": "Timeout for sending the media in seconds." + }, + "inline_keyboard": { + "name": "[%key:component::telegram_bot::services::send_message::fields::inline_keyboard::name%]", + "description": "[%key:component::telegram_bot::services::send_message::fields::inline_keyboard::description%]" + } + } + }, "edit_caption": { "name": "Edit caption", "description": "Edits the caption of a previously sent message.", diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index cda2583e74b..ba8bf41b52b 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock, mock_open, patch import pytest from telegram import Chat, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update -from telegram.constants import ChatType, ParseMode +from telegram.constants import ChatType, InputMediaType, ParseMode from telegram.error import ( InvalidToken, NetworkError, @@ -33,6 +33,7 @@ from homeassistant.components.telegram_bot.const import ( ATTR_FILE, ATTR_KEYBOARD, ATTR_KEYBOARD_INLINE, + ATTR_MEDIA_TYPE, ATTR_MESSAGE, ATTR_MESSAGE_TAG, ATTR_MESSAGE_THREAD_ID, @@ -59,6 +60,7 @@ from homeassistant.components.telegram_bot.const import ( SERVICE_DELETE_MESSAGE, SERVICE_EDIT_CAPTION, SERVICE_EDIT_MESSAGE, + SERVICE_EDIT_MESSAGE_MEDIA, SERVICE_EDIT_REPLYMARKUP, SERVICE_LEAVE_CHAT, SERVICE_SEND_ANIMATION, @@ -888,6 +890,77 @@ async def test_delete_message( mock.assert_called_once() +@pytest.mark.parametrize( + ("media_type", "expected_media_class"), + [ + ( + InputMediaType.ANIMATION, + "InputMediaAnimation", + ), + ( + InputMediaType.AUDIO, + "InputMediaAudio", + ), + ( + InputMediaType.DOCUMENT, + "InputMediaDocument", + ), + ( + InputMediaType.PHOTO, + "InputMediaPhoto", + ), + ( + InputMediaType.VIDEO, + "InputMediaVideo", + ), + ], +) +async def test_edit_message_media( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, + media_type: str, + expected_media_class: str, +) -> None: + """Test edit message media.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + hass.config.allowlist_external_dirs.add("/tmp/") # noqa: S108 + write_utf8_file("/tmp/mock", "mock file contents") # noqa: S108 + + with patch( + "homeassistant.components.telegram_bot.bot.Bot.edit_message_media", + AsyncMock(return_value=True), + ) as mock: + await hass.services.async_call( + DOMAIN, + SERVICE_EDIT_MESSAGE_MEDIA, + { + ATTR_CAPTION: "mock caption", + ATTR_FILE: "/tmp/mock", # noqa: S108 + ATTR_MEDIA_TYPE: media_type, + ATTR_MESSAGEID: 12345, + ATTR_CHAT_ID: 123456, + ATTR_TIMEOUT: 10, + ATTR_KEYBOARD_INLINE: "/mock", + }, + blocking=True, + ) + + await hass.async_block_till_done() + mock.assert_called_once() + assert mock.call_args[1]["media"].__class__.__name__ == expected_media_class + assert mock.call_args[1]["media"].caption == "mock caption" + assert mock.call_args[1]["chat_id"] == 123456 + assert mock.call_args[1]["message_id"] == 12345 + assert mock.call_args[1]["reply_markup"] == InlineKeyboardMarkup( + [[InlineKeyboardButton(callback_data="/mock", text="MOCK")]] + ) + assert mock.call_args[1]["read_timeout"] == 10 + + async def test_edit_message( hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry,