diff --git a/homeassistant/components/image/__init__.py b/homeassistant/components/image/__init__.py index 7bf0060f593..5f7c497fdb4 100644 --- a/homeassistant/components/image/__init__.py +++ b/homeassistant/components/image/__init__.py @@ -70,6 +70,20 @@ LAST_FRAME_MARKER = bytes(f"\r\n--{FRAME_BOUNDARY}--\r\n", "utf-8") IMAGE_SERVICE_SNAPSHOT: VolDictType = {vol.Required(ATTR_FILENAME): cv.string} +MAP_MAGIC_NUMBERS_TO_CONTENT_TYPE = { + b"\x89PNG": "image/png", + b"GIF8": "image/gif", + b"RIFF": "image/webp", + b"\x49\x49\x2a\x00": "image/tiff", + b"\x4d\x4d\x00\x2a": "image/tiff", + b"\xff\xd8\xff\xdb": "image/jpeg", + b"\xff\xd8\xff\xe0": "image/jpeg", + b"\xff\xd8\xff\xed": "image/jpeg", + b"\xff\xd8\xff\xee": "image/jpeg", + b"\xff\xd8\xff\xe1": "image/jpeg", + b"\xff\xd8\xff\xe2": "image/jpeg", +} + class ImageEntityDescription(EntityDescription, frozen_or_thawed=True): """A class that describes image entities.""" @@ -94,6 +108,11 @@ def valid_image_content_type(content_type: str | None) -> str: return content_type +def infer_image_type(content: bytes) -> str | None: + """Infer image type from first 4 bytes (magic number).""" + return MAP_MAGIC_NUMBERS_TO_CONTENT_TYPE.get(content[:4]) + + async def _async_get_image(image_entity: ImageEntity, timeout: int) -> Image: """Fetch image from an image entity.""" with suppress(asyncio.CancelledError, TimeoutError, ImageContentTypeError): @@ -242,7 +261,9 @@ class ImageEntity(Entity, cached_properties=CACHED_PROPERTIES_WITH_ATTR_): async def _async_load_image_from_url(self, url: str) -> Image | None: """Load an image by url.""" if response := await self._fetch_url(url): - content_type = response.headers.get("content-type") + content_type = response.headers.get("content-type") or infer_image_type( + response.content + ) try: return Image( content=response.content, diff --git a/tests/components/image/test_init.py b/tests/components/image/test_init.py index 0a1c939c474..8bb05705055 100644 --- a/tests/components/image/test_init.py +++ b/tests/components/image/test_init.py @@ -348,6 +348,51 @@ async def test_fetch_image_url_wrong_content_type( assert resp.status == HTTPStatus.INTERNAL_SERVER_ERROR +@respx.mock +@pytest.mark.parametrize( + ("content", "content_type"), + [ + (b"\x89PNG", "image/png"), + (b"\xff\xd8\xff\xdb", "image/jpeg"), + (b"\xff\xd8\xff\xe0", "image/jpeg"), + (b"\xff\xd8\xff\xed", "image/jpeg"), + (b"\xff\xd8\xff\xee", "image/jpeg"), + (b"\xff\xd8\xff\xe1", "image/jpeg"), + (b"\xff\xd8\xff\xe2", "image/jpeg"), + (b"GIF89a", "image/gif"), + (b"GIF87a", "image/gif"), + (b"RIFF", "image/webp"), + (b"\x49\x49\x2a\x00", "image/tiff"), + (b"\x4d\x4d\x00\x2a", "image/tiff"), + ], +) +async def test_fetch_image_url_infer_content_type_from_magic_number( + hass: HomeAssistant, + hass_client: ClientSessionGenerator, + content: bytes, + content_type: str, +) -> None: + """Test fetching an image and inferring content-type from magic number.""" + respx.get("https://example.com/myimage.jpg").respond( + status_code=HTTPStatus.OK, content=content + ) + + mock_integration(hass, MockModule(domain="test")) + mock_platform(hass, "test.image", MockImagePlatform([MockURLImageEntity(hass)])) + assert await async_setup_component( + hass, image.DOMAIN, {"image": {"platform": "test"}} + ) + await hass.async_block_till_done() + + client = await hass_client() + + resp = await client.get("/api/image_proxy/image.test") + assert resp.status == HTTPStatus.OK + body = await resp.read() + assert body == content + assert resp.content_type == content_type + + async def test_image_stream( hass: HomeAssistant, hass_client: ClientSessionGenerator,