-
Notifications
You must be signed in to change notification settings - Fork 396
Animated thumbnails #18831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Animated thumbnails #18831
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add animated image support to the media thumbnailer. |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -22,9 +22,9 @@ | |||||
import logging | ||||||
from io import BytesIO | ||||||
from types import TracebackType | ||||||
from typing import TYPE_CHECKING, List, Optional, Tuple, Type | ||||||
from typing import TYPE_CHECKING, List, Optional, Tuple, Type, cast | ||||||
|
||||||
from PIL import Image | ||||||
from PIL import Image, ImageSequence | ||||||
|
||||||
from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error | ||||||
from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP | ||||||
|
@@ -152,20 +152,24 @@ def aspect(self, max_width: int, max_height: int) -> Tuple[int, int]: | |||||
else: | ||||||
return max((max_height * self.width) // self.height, 1), max_height | ||||||
|
||||||
def _resize(self, width: int, height: int) -> Image.Image: | ||||||
def _resize_image(self, image: Image.Image, width: int, height: int) -> Image.Image: | ||||||
# 1-bit or 8-bit color palette images need converting to RGB | ||||||
# otherwise they will be scaled using nearest neighbour which | ||||||
# looks awful. | ||||||
# | ||||||
# If the image has transparency, use RGBA instead. | ||||||
if self.image.mode in ["1", "L", "P"]: | ||||||
if self.image.info.get("transparency", None) is not None: | ||||||
with self.image: | ||||||
self.image = self.image.convert("RGBA") | ||||||
if image.mode in ["1", "L", "P"]: | ||||||
if image.info.get("transparency", None) is not None: | ||||||
converted = image.convert("RGBA") | ||||||
else: | ||||||
with self.image: | ||||||
self.image = self.image.convert("RGB") | ||||||
return self.image.resize((width, height), Image.LANCZOS) | ||||||
converted = image.convert("RGB") | ||||||
else: | ||||||
converted = image | ||||||
return converted.resize((width, height), Image.LANCZOS) | ||||||
|
||||||
def _resize(self, width: int, height: int) -> Image.Image: | ||||||
# Backwards-compatible single-image resize: operate on self.image | ||||||
return self._resize_image(self.image, width, height) | ||||||
|
||||||
@trace | ||||||
def scale(self, width: int, height: int, output_type: str) -> BytesIO: | ||||||
|
@@ -174,8 +178,34 @@ def scale(self, width: int, height: int, output_type: str) -> BytesIO: | |||||
Returns: | ||||||
The bytes of the encoded image ready to be written to disk | ||||||
""" | ||||||
with self._resize(width, height) as scaled: | ||||||
return self._encode_image(scaled, output_type) | ||||||
# If it's an animated image, generate an animated thumbnail (preserve | ||||||
# animation). Otherwise, fall back to static processing using the first | ||||||
# frame. | ||||||
if getattr(self.image, "is_animated", False): | ||||||
frames = [] | ||||||
durations = [] | ||||||
loop = self.image.info.get("loop", 0) | ||||||
transparency = self.image.info.get("transparency", None) | ||||||
for frame in ImageSequence.Iterator(self.image): | ||||||
# Copy the frame to avoid referencing the original image memory | ||||||
f = frame.copy() | ||||||
if f.mode != "RGBA": | ||||||
f = f.convert("RGBA") | ||||||
resized = self._resize_image(f, width, height) | ||||||
frames.append(resized) | ||||||
durations.append( | ||||||
frame.info.get("duration") or self.image.info.get("duration") or 100 | ||||||
) | ||||||
return self._encode_animated(frames, durations, loop, transparency) | ||||||
else: | ||||||
# Static processing | ||||||
first = next(ImageSequence.Iterator(self.image)) | ||||||
if first is not self.image: | ||||||
base = first.copy() | ||||||
else: | ||||||
base = self.image | ||||||
with self._resize_image(base, width, height) as scaled: | ||||||
return self._encode_image(scaled, output_type) | ||||||
|
||||||
@trace | ||||||
def crop(self, width: int, height: int, output_type: str) -> BytesIO: | ||||||
|
@@ -205,16 +235,81 @@ def crop(self, width: int, height: int, output_type: str) -> BytesIO: | |||||
crop_right = width + crop_left | ||||||
crop = (crop_left, 0, crop_right, height) | ||||||
|
||||||
with self._resize(scaled_width, scaled_height) as scaled_image: | ||||||
with scaled_image.crop(crop) as cropped: | ||||||
return self._encode_image(cropped, output_type) | ||||||
# If it's an animated image, generate an animated thumbnail (preserve | ||||||
# animation). Otherwise, fall back to static processing using the first | ||||||
# frame. | ||||||
if getattr(self.image, "is_animated", False): | ||||||
frames = [] | ||||||
durations = [] | ||||||
loop = self.image.info.get("loop", 0) | ||||||
transparency = self.image.info.get("transparency", None) | ||||||
for frame in ImageSequence.Iterator(self.image): | ||||||
f = frame.copy() | ||||||
if f.mode != "RGBA": | ||||||
f = f.convert("RGBA") | ||||||
scaled = self._resize_image(f, scaled_width, scaled_height) | ||||||
cropped = scaled.crop(crop) | ||||||
frames.append(cropped) | ||||||
durations.append( | ||||||
frame.info.get("duration") or self.image.info.get("duration") or 100 | ||||||
) | ||||||
return self._encode_animated(frames, durations, loop, transparency) | ||||||
else: | ||||||
# Static processing | ||||||
first = next(ImageSequence.Iterator(self.image)) | ||||||
if first is not self.image: | ||||||
base = first.copy() | ||||||
else: | ||||||
base = self.image | ||||||
with self._resize_image(base, scaled_width, scaled_height) as scaled_image: | ||||||
with scaled_image.crop(crop) as cropped: | ||||||
return self._encode_image(cropped, output_type) | ||||||
|
||||||
def _encode_image(self, output_image: Image.Image, output_type: str) -> BytesIO: | ||||||
output_bytes_io = BytesIO() | ||||||
fmt = self.FORMATS[output_type] | ||||||
if fmt == "JPEG" or fmt == "PNG" and output_image.mode == "CMYK": | ||||||
output_image = output_image.convert("RGB") | ||||||
output_image.save(output_bytes_io, fmt, quality=80) | ||||||
output_bytes_io.seek(0) | ||||||
return output_bytes_io | ||||||
|
||||||
def _encode_animated( | ||||||
self, | ||||||
frames: List[Image.Image], | ||||||
durations: List[int], | ||||||
loop: int, | ||||||
transparency: Optional[int], | ||||||
) -> BytesIO: | ||||||
""" | ||||||
Encode a list of RGBA frames into an animated GIF, attempting to | ||||||
preserve durations, loop count and transparency where possible. | ||||||
""" | ||||||
output_bytes_io = BytesIO() | ||||||
if not frames: | ||||||
raise ThumbnailError("No frames to encode for animated GIF") | ||||||
|
||||||
# Ensure all frames are in 'P' mode with adaptive palette. | ||||||
paletted_frames = [] | ||||||
for _idx, f in enumerate(frames): | ||||||
if f.mode != "RGBA": | ||||||
f = f.convert("RGBA") | ||||||
p = f.convert("P", palette=Image.ADAPTIVE) | ||||||
paletted_frames.append(p) | ||||||
|
||||||
save_kwargs = { | ||||||
"format": "GIF", | ||||||
"save_all": True, | ||||||
"append_images": paletted_frames[1:], | ||||||
"loop": loop, | ||||||
"duration": durations, | ||||||
"disposal": 2, | ||||||
} | ||||||
if transparency is not None: | ||||||
save_kwargs["transparency"] = transparency | ||||||
|
||||||
paletted_frames[0].save(output_bytes_io, **cast(dict, save_kwargs)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cast(dict, save_kwargs) is unnecessary since save_kwargs is already declared as a dict. This cast adds complexity without benefit.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't pass the linter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you try the suggestion, you run into: $ poetry run mypy synapse/media/thumbnailer.py
synapse/media/thumbnailer.py:311: note: Revealed type is "builtins.dict[builtins.str, builtins.object]"
synapse/media/thumbnailer.py:313: error: Argument 2 to "save" of "Image" has incompatible type "**dict[str, object]"; expected "Optional[str]" [arg-type]
synapse/media/thumbnailer.py:313: error: Argument 2 to "save" of "Image" has incompatible type "**dict[str, object]"; expected "bool" [arg-type]
synapse/media/thumbnailer.py:313: error: Argument 2 to "save" of "Image" has incompatible type "**dict[str, object]"; expected "Literal['bmp', 'png']" [arg-type] But overall, the It would be better to annotate the correct type where it is declared
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Feel free to modify my PR however you like, I'm no expert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @catfromplan9 Are you interested in continuing the PR further? There's bound to be more review points to address once this gets a full round of review. While improving the media repo in Synapse itself is desirable, this isn't necessarily something the team would take over and put effort into ourselves. Happy to review and push this along if you plan on making the changes though 🙂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can you outline to me what exactly you would need me to change? I will do so. |
||||||
output_bytes_io.seek(0) | ||||||
return output_bytes_io | ||||||
|
||||||
def close(self) -> None: | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The animated processing logic is duplicated between scale() and crop() methods. Consider extracting this into a shared helper method to reduce code duplication.
Copilot uses AI. Check for mistakes.