Add edit message media feature for Telegram bot (#151034)

This commit is contained in:
hanwg
2025-10-10 21:50:54 +08:00
committed by GitHub
parent 1001da08f6
commit f49299b009
7 changed files with 319 additions and 3 deletions

View File

@@ -8,6 +8,7 @@ from types import ModuleType
from typing import Any
from telegram import Bot
from telegram.constants import InputMediaType
from telegram.error import InvalidToken, TelegramError
import voluptuous as vol
@@ -52,6 +53,7 @@ from .const import (
ATTR_IS_BIG,
ATTR_KEYBOARD,
ATTR_KEYBOARD_INLINE,
ATTR_MEDIA_TYPE,
ATTR_MESSAGE,
ATTR_MESSAGE_TAG,
ATTR_MESSAGE_THREAD_ID,
@@ -98,6 +100,7 @@ from .const import (
SERVICE_DELETE_MESSAGE,
SERVICE_EDIT_CAPTION,
SERVICE_EDIT_MESSAGE,
SERVICE_EDIT_MESSAGE_MEDIA,
SERVICE_EDIT_REPLYMARKUP,
SERVICE_LEAVE_CHAT,
SERVICE_SEND_ANIMATION,
@@ -233,6 +236,35 @@ SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend(
}
)
SERVICE_SCHEMA_EDIT_MESSAGE_MEDIA = vol.Schema(
{
vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string,
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Optional(ATTR_TIMEOUT): cv.positive_int,
vol.Optional(ATTR_CAPTION): cv.string,
vol.Required(ATTR_MEDIA_TYPE): vol.In(
(
str(InputMediaType.ANIMATION),
str(InputMediaType.AUDIO),
str(InputMediaType.VIDEO),
str(InputMediaType.DOCUMENT),
str(InputMediaType.PHOTO),
)
),
vol.Optional(ATTR_URL): cv.string,
vol.Optional(ATTR_FILE): cv.string,
vol.Optional(ATTR_USERNAME): cv.string,
vol.Optional(ATTR_PASSWORD): cv.string,
vol.Optional(ATTR_AUTHENTICATION): cv.string,
vol.Optional(ATTR_VERIFY_SSL): cv.boolean,
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema(
{
vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string,
@@ -311,6 +343,7 @@ SERVICE_MAP = {
SERVICE_SEND_LOCATION: SERVICE_SCHEMA_SEND_LOCATION,
SERVICE_SEND_POLL: SERVICE_SCHEMA_SEND_POLL,
SERVICE_EDIT_MESSAGE: SERVICE_SCHEMA_EDIT_MESSAGE,
SERVICE_EDIT_MESSAGE_MEDIA: SERVICE_SCHEMA_EDIT_MESSAGE_MEDIA,
SERVICE_EDIT_CAPTION: SERVICE_SCHEMA_EDIT_CAPTION,
SERVICE_EDIT_REPLYMARKUP: SERVICE_SCHEMA_EDIT_REPLYMARKUP,
SERVICE_ANSWER_CALLBACK_QUERY: SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY,
@@ -435,6 +468,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
await notify_service.leave_chat(context=service.context, **kwargs)
elif msgtype == SERVICE_SET_MESSAGE_REACTION:
await notify_service.set_message_reaction(context=service.context, **kwargs)
elif msgtype == SERVICE_EDIT_MESSAGE_MEDIA:
await notify_service.edit_message_media(context=service.context, **kwargs)
else:
await notify_service.edit_message(
msgtype, context=service.context, **kwargs

View File

@@ -15,6 +15,12 @@ from telegram import (
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
InputMedia,
InputMediaAnimation,
InputMediaAudio,
InputMediaDocument,
InputMediaPhoto,
InputMediaVideo,
InputPollOption,
Message,
ReplyKeyboardMarkup,
@@ -22,7 +28,7 @@ from telegram import (
Update,
User,
)
from telegram.constants import ParseMode
from telegram.constants import InputMediaType, ParseMode
from telegram.error import TelegramError
from telegram.ext import CallbackContext, filters
from telegram.request import HTTPXRequest
@@ -52,6 +58,7 @@ from .const import (
ATTR_FILE,
ATTR_FROM_FIRST,
ATTR_FROM_LAST,
ATTR_INLINE_MESSAGE_ID,
ATTR_KEYBOARD,
ATTR_KEYBOARD_INLINE,
ATTR_MESSAGE,
@@ -299,7 +306,7 @@ class TelegramNotificationService:
):
message_id = self._last_message_id[chat_id]
else:
inline_message_id = msg_data["inline_message_id"]
inline_message_id = msg_data[ATTR_INLINE_MESSAGE_ID]
return message_id, inline_message_id
def get_target_chat_ids(self, target: int | list[int] | None) -> list[int]:
@@ -527,6 +534,63 @@ class TelegramNotificationService:
self._last_message_id[chat_id] -= 1
return deleted
async def edit_message_media(
self,
media_type: str,
chat_id: int | None = None,
context: Context | None = None,
**kwargs: Any,
) -> Any:
"Edit message media of a previously sent message."
chat_id = self.get_target_chat_ids(chat_id)[0]
message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id)
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Edit message media %s in chat ID %s with params: %s",
message_id or inline_message_id,
chat_id,
params,
)
file_content = await load_data(
self.hass,
url=kwargs.get(ATTR_URL),
filepath=kwargs.get(ATTR_FILE),
username=kwargs.get(ATTR_USERNAME, ""),
password=kwargs.get(ATTR_PASSWORD, ""),
authentication=kwargs.get(ATTR_AUTHENTICATION),
verify_ssl=(
get_default_context()
if kwargs.get(ATTR_VERIFY_SSL, False)
else get_default_no_verify_context()
),
)
media: InputMedia
if media_type == InputMediaType.ANIMATION:
media = InputMediaAnimation(file_content, caption=kwargs.get(ATTR_CAPTION))
elif media_type == InputMediaType.AUDIO:
media = InputMediaAudio(file_content, caption=kwargs.get(ATTR_CAPTION))
elif media_type == InputMediaType.DOCUMENT:
media = InputMediaDocument(file_content, caption=kwargs.get(ATTR_CAPTION))
elif media_type == InputMediaType.PHOTO:
media = InputMediaPhoto(file_content, caption=kwargs.get(ATTR_CAPTION))
else:
media = InputMediaVideo(file_content, caption=kwargs.get(ATTR_CAPTION))
return await self._send_msg(
self.bot.edit_message_media,
"Error editing message media",
params[ATTR_MESSAGE_TAG],
media=media,
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
context=context,
)
async def edit_message(
self,
type_edit: str,

View File

@@ -44,6 +44,7 @@ SERVICE_SEND_LOCATION = "send_location"
SERVICE_SEND_POLL = "send_poll"
SERVICE_SET_MESSAGE_REACTION = "set_message_reaction"
SERVICE_EDIT_MESSAGE = "edit_message"
SERVICE_EDIT_MESSAGE_MEDIA = "edit_message_media"
SERVICE_EDIT_CAPTION = "edit_caption"
SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup"
SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query"
@@ -96,6 +97,8 @@ ATTR_RESIZE_KEYBOARD = "resize_keyboard"
ATTR_ONE_TIME_KEYBOARD = "one_time_keyboard"
ATTR_KEYBOARD_INLINE = "inline_keyboard"
ATTR_MESSAGEID = "message_id"
ATTR_INLINE_MESSAGE_ID = "inline_message_id"
ATTR_MEDIA_TYPE = "media_type"
ATTR_MSG = "message"
ATTR_MSGID = "id"
ATTR_PARSER = "parse_mode"

View File

@@ -33,6 +33,9 @@
"edit_message": {
"service": "mdi:pencil"
},
"edit_message_media": {
"service": "mdi:pencil"
},
"edit_caption": {
"service": "mdi:pencil"
},

View File

@@ -746,6 +746,77 @@ edit_message:
selector:
object:
edit_message_media:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message_id:
required: true
example: "{{ trigger.event.data.message.message_id }}"
selector:
text:
chat_id:
required: true
example: 12345
selector:
text:
timeout:
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
media_type:
selector:
select:
options:
- "animation"
- "audio"
- "document"
- "photo"
- "video"
translation_key: "media_type"
url:
example: "http://example.org/path/to/the/image.png"
selector:
text:
file:
example: "/path/to/the/image.png"
selector:
text:
caption:
example: Document Title xy
selector:
text:
authentication:
selector:
select:
options:
- "basic"
- "digest"
- "bearer_token"
translation_key: "authentication"
username:
example: myuser
selector:
text:
password:
example: myuser_pwd
selector:
text:
type: password
verify_ssl:
selector:
boolean:
inline_keyboard:
example:
'["/button1, /button2", "/button3"] or [[["Text button1", "/button1"],
["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
edit_caption:
fields:
config_entry_id:

View File

@@ -153,6 +153,15 @@
"record_video_note": "Recording video note",
"upload_video_note": "Uploading video note"
}
},
"media_type": {
"options": {
"animation": "Animation",
"audio": "Audio",
"document": "Document",
"photo": "Photo",
"video": "Video"
}
}
},
"services": {
@@ -814,6 +823,64 @@
}
}
},
"edit_message_media": {
"name": "Edit message media",
"description": "Edits the media content of a previously sent message.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to edit the message media."
},
"message_id": {
"name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]",
"description": "[%key:component::telegram_bot::services::edit_message::fields::message_id::description%]"
},
"chat_id": {
"name": "[%key:component::telegram_bot::services::edit_message::fields::chat_id::name%]",
"description": "[%key:component::telegram_bot::services::edit_message::fields::chat_id::description%]"
},
"media_type": {
"name": "Media type",
"description": "Type for the new media."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to the media."
},
"file": {
"name": "[%key:component::telegram_bot::services::send_photo::fields::file::name%]",
"description": "Local path to the media."
},
"caption": {
"name": "[%key:component::telegram_bot::services::send_photo::fields::caption::name%]",
"description": "The title of the media."
},
"username": {
"name": "[%key:common::config_flow::data::username%]",
"description": "[%key:component::telegram_bot::services::send_photo::fields::username::description%]"
},
"password": {
"name": "[%key:common::config_flow::data::password%]",
"description": "[%key:component::telegram_bot::services::send_photo::fields::password::description%]"
},
"authentication": {
"name": "[%key:component::telegram_bot::services::send_photo::fields::authentication::name%]",
"description": "[%key:component::telegram_bot::services::send_photo::fields::authentication::description%]"
},
"verify_ssl": {
"name": "[%key:component::telegram_bot::services::send_photo::fields::verify_ssl::name%]",
"description": "[%key:component::telegram_bot::services::send_photo::fields::verify_ssl::description%]"
},
"timeout": {
"name": "[%key:component::telegram_bot::services::send_photo::fields::timeout::name%]",
"description": "Timeout for sending the media in seconds."
},
"inline_keyboard": {
"name": "[%key:component::telegram_bot::services::send_message::fields::inline_keyboard::name%]",
"description": "[%key:component::telegram_bot::services::send_message::fields::inline_keyboard::description%]"
}
}
},
"edit_caption": {
"name": "Edit caption",
"description": "Edits the caption of a previously sent message.",

View File

@@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock, mock_open, patch
import pytest
from telegram import Chat, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update
from telegram.constants import ChatType, ParseMode
from telegram.constants import ChatType, InputMediaType, ParseMode
from telegram.error import (
InvalidToken,
NetworkError,
@@ -33,6 +33,7 @@ from homeassistant.components.telegram_bot.const import (
ATTR_FILE,
ATTR_KEYBOARD,
ATTR_KEYBOARD_INLINE,
ATTR_MEDIA_TYPE,
ATTR_MESSAGE,
ATTR_MESSAGE_TAG,
ATTR_MESSAGE_THREAD_ID,
@@ -59,6 +60,7 @@ from homeassistant.components.telegram_bot.const import (
SERVICE_DELETE_MESSAGE,
SERVICE_EDIT_CAPTION,
SERVICE_EDIT_MESSAGE,
SERVICE_EDIT_MESSAGE_MEDIA,
SERVICE_EDIT_REPLYMARKUP,
SERVICE_LEAVE_CHAT,
SERVICE_SEND_ANIMATION,
@@ -888,6 +890,77 @@ async def test_delete_message(
mock.assert_called_once()
@pytest.mark.parametrize(
("media_type", "expected_media_class"),
[
(
InputMediaType.ANIMATION,
"InputMediaAnimation",
),
(
InputMediaType.AUDIO,
"InputMediaAudio",
),
(
InputMediaType.DOCUMENT,
"InputMediaDocument",
),
(
InputMediaType.PHOTO,
"InputMediaPhoto",
),
(
InputMediaType.VIDEO,
"InputMediaVideo",
),
],
)
async def test_edit_message_media(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
media_type: str,
expected_media_class: str,
) -> None:
"""Test edit message media."""
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
hass.config.allowlist_external_dirs.add("/tmp/") # noqa: S108
write_utf8_file("/tmp/mock", "mock file contents") # noqa: S108
with patch(
"homeassistant.components.telegram_bot.bot.Bot.edit_message_media",
AsyncMock(return_value=True),
) as mock:
await hass.services.async_call(
DOMAIN,
SERVICE_EDIT_MESSAGE_MEDIA,
{
ATTR_CAPTION: "mock caption",
ATTR_FILE: "/tmp/mock", # noqa: S108
ATTR_MEDIA_TYPE: media_type,
ATTR_MESSAGEID: 12345,
ATTR_CHAT_ID: 123456,
ATTR_TIMEOUT: 10,
ATTR_KEYBOARD_INLINE: "/mock",
},
blocking=True,
)
await hass.async_block_till_done()
mock.assert_called_once()
assert mock.call_args[1]["media"].__class__.__name__ == expected_media_class
assert mock.call_args[1]["media"].caption == "mock caption"
assert mock.call_args[1]["chat_id"] == 123456
assert mock.call_args[1]["message_id"] == 12345
assert mock.call_args[1]["reply_markup"] == InlineKeyboardMarkup(
[[InlineKeyboardButton(callback_data="/mock", text="MOCK")]]
)
assert mock.call_args[1]["read_timeout"] == 10
async def test_edit_message(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,