Source code for cloud_server.thumbnail_generator

"""Thumbnail generation for images and videos."""

import logging
from pathlib import Path

import cv2
from PIL import Image

from cloud_server.models import FileMetadata

logger = logging.getLogger(__name__)


[docs] class ThumbnailGenerator: """Generate thumbnails for images and videos."""
[docs] def __init__(self, thumbnails_directory: Path) -> None: """Initialize thumbnail generator. :param Path thumbnails_directory: Directory to save generated thumbnails """ self._thumbnails_directory = thumbnails_directory self._thumbnails_directory.mkdir(parents=True, exist_ok=True) logger.info("Saving thumbnails to directory: %s", thumbnails_directory)
def _generate_image_thumbnail(self, filepath: Path) -> Image.Image: """Generate thumbnail for an image file. :param Path filepath: Path to source image :return Image.Image: The generated thumbnail image """ with Image.open(filepath) as img: thumbnail_img = img.copy() if img.mode == "RGBA": background = Image.new("RGB", img.size, (255, 255, 255)) background.paste(img, mask=img.split()[3]) thumbnail_img = background elif img.mode != "RGB": thumbnail_img = img.convert("RGB") return thumbnail_img def _generate_video_thumbnail(self, filepath: Path) -> Image.Image: """Generate thumbnail for a video file by extracting a frame. :param Path filepath: Path to source video :return Image.Image: The generated thumbnail image :raises OSError: If the video cannot be opened or read """ if not (video := cv2.VideoCapture(filepath)).isOpened(): error_msg = f"Could not open video file: {filepath}" logger.error(error_msg) raise OSError(error_msg) if (fps := video.get(cv2.CAP_PROP_FPS)) > 0: video.set(cv2.CAP_PROP_POS_FRAMES, int(fps)) success, frame = video.read() video.release() if not success or frame is None: error_msg = f"Could not read frame from video: {filepath}" logger.error(error_msg) raise OSError(error_msg) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return Image.fromarray(frame_rgb) def _save_thumbnail(self, img: Image.Image, output_path: Path, thumbnail_size: tuple[int, int]) -> None: """Save a PIL Image as a thumbnail. :param Image img: PIL Image to save :param Path output_path: Path to save the thumbnail :param tuple[int, int] thumbnail_size: Target thumbnail size (width, height) """ img.thumbnail(thumbnail_size, Image.Resampling.LANCZOS) img.save(output_path, "JPEG", quality=85, optimize=True)
[docs] def get_thumbnail_path(self, file_id: int) -> Path: """Get the thumbnail path for a given file ID. :param int file_id: Unique identifier for the file :return Path: Path to the thumbnail image """ return self._thumbnails_directory / f"{file_id}.jpg"
[docs] def generate_thumbnail(self, filepath: Path, mime_type: str, file_id: int, thumbnail_size: tuple[int, int]) -> None: """Generate thumbnail based on MIME type. :param Path filepath: Path to source file :param str mime_type: MIME type of the file :param int file_id: Unique identifier for the file :param tuple[int, int] thumbnail_size: Target thumbnail size (width, height) :raises FileExistsError: If a thumbnail already exists for the file :raises ValueError: If the MIME type is unsupported for thumbnail generation """ if (output_path := self.get_thumbnail_path(file_id)).exists(): error_msg = f"Thumbnail already exists for file {file_id}: {filepath}" logger.error(error_msg) raise FileExistsError(error_msg) if mime_type.startswith("image/"): thumbnail_img = self._generate_image_thumbnail(filepath=filepath) elif mime_type.startswith("video/"): thumbnail_img = self._generate_video_thumbnail(filepath=filepath) else: error_msg = f"Unsupported MIME type for thumbnail generation: {mime_type}" logger.error(error_msg) raise ValueError(error_msg) self._save_thumbnail(img=thumbnail_img, output_path=output_path, thumbnail_size=thumbnail_size)
[docs] def synchronize_with_storage( self, storage_directory: Path, files_metadata: list[FileMetadata], thumbnail_size: tuple[int, int] ) -> None: """Synchronize thumbnails with the storage directory by generating missing thumbnails and removing stale ones. :param Path storage_directory: Path to the server storage directory :param list[FileMetadata] files_metadata: File metadata in the database :param tuple[int, int] thumbnail_size: Target thumbnail size (width, height) """ if not any(storage_directory.iterdir()): logger.warning("Storage directory is empty, skipping synchronization: %s", storage_directory) return existing_thumbnails = {int(thumbnail.stem) for thumbnail in self._thumbnails_directory.glob("*.jpg")} for file_metadata in files_metadata: if not file_metadata.mime_type.startswith(("image/", "video/")) or file_metadata.id is None: continue if file_metadata.id not in existing_thumbnails: if not (filepath := storage_directory / file_metadata.filepath).is_file(): error_msg = f"File {file_metadata.id} does not exist in storage: {filepath}" logger.error(error_msg) continue try: self.generate_thumbnail( filepath=filepath, mime_type=file_metadata.mime_type, file_id=file_metadata.id, thumbnail_size=thumbnail_size, ) logger.info("Generated thumbnail for file %d: %s", file_metadata.id, filepath) except Exception: logger.exception("Failed to generate thumbnail for file %d: %s", file_metadata.id, filepath) continue existing_thumbnails.discard(file_metadata.id) for stale_id in existing_thumbnails: stale_thumbnail = self.get_thumbnail_path(file_id=stale_id) try: stale_thumbnail.unlink() logger.info("Removed stale thumbnail for file %d: %s", stale_id, stale_thumbnail) except Exception: logger.exception("Failed to remove stale thumbnail for file %d: %s", stale_id, stale_thumbnail) continue logger.info( "Synchronized %d thumbnails with storage directory.", len(list(self._thumbnails_directory.glob("*.jpg"))) )