# Copy from https://github.com/silentsokolov/flask-thumbnails/blob/master/flask_thumbnails/thumbnail.py import os from datetime import datetime import cv2 import time from io import BytesIO from pathlib import Path import numpy as np from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer from PIL import Image, ImageOps, PngImagePlugin from loguru import logger LARGE_ENOUGH_NUMBER = 100 PngImagePlugin.MAX_TEXT_CHUNK = LARGE_ENOUGH_NUMBER * (1024 ** 2) from .storage_backends import FilesystemStorageBackend from .utils import aspect_to_string, generate_filename, glob_img class FileManager(FileSystemEventHandler): def __init__(self, app=None): self.app = app self._default_root_directory = "media" self._default_thumbnail_directory = "media" self._default_root_url = "/" self._default_thumbnail_root_url = "/" self._default_format = "JPEG" self.output_dir: Path = None if app is not None: self.init_app(app) self.image_dir_filenames = [] self.output_dir_filenames = [] self.image_dir_observer = None self.output_dir_observer = None self.modified_time = { "image": datetime.utcnow(), "output": datetime.utcnow(), } def start(self): self.image_dir_filenames = self._media_names(self.root_directory) self.output_dir_filenames = self._media_names(self.output_dir) logger.info(f"Start watching image directory: {self.root_directory}") self.image_dir_observer = Observer() self.image_dir_observer.schedule(self, self.root_directory, recursive=False) self.image_dir_observer.start() logger.info(f"Start watching output directory: {self.output_dir}") self.output_dir_observer = Observer() self.output_dir_observer.schedule(self, self.output_dir, recursive=False) self.output_dir_observer.start() def on_modified(self, event): if not os.path.isdir(event.src_path): return if event.src_path == str(self.root_directory): logger.info(f"Image directory {event.src_path} modified") self.image_dir_filenames = self._media_names(self.root_directory) self.modified_time['image'] = datetime.utcnow() elif event.src_path == str(self.output_dir): logger.info(f"Output directory {event.src_path} modified") self.output_dir_filenames = self._media_names(self.output_dir) self.modified_time['output'] = datetime.utcnow() def init_app(self, app): if self.app is None: self.app = app app.thumbnail_instance = self if not hasattr(app, "extensions"): app.extensions = {} if "thumbnail" in app.extensions: raise RuntimeError("Flask-thumbnail extension already initialized") app.extensions["thumbnail"] = self app.config.setdefault("THUMBNAIL_MEDIA_ROOT", self._default_root_directory) app.config.setdefault("THUMBNAIL_MEDIA_THUMBNAIL_ROOT", self._default_thumbnail_directory) app.config.setdefault("THUMBNAIL_MEDIA_URL", self._default_root_url) app.config.setdefault("THUMBNAIL_MEDIA_THUMBNAIL_URL", self._default_thumbnail_root_url) app.config.setdefault("THUMBNAIL_DEFAULT_FORMAT", self._default_format) def save_to_output_directory(self, image: np.ndarray, filename: str): fp = Path(filename) new_name = fp.stem + f"_{int(time.time())}" + fp.suffix if image.shape[2] == 3: image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) elif image.shape[2] == 4: image = cv2.cvtColor(image, cv2.COLOR_RGBA2BGRA) cv2.imwrite(str(self.output_dir / new_name), image) @property def root_directory(self): path = self.app.config["THUMBNAIL_MEDIA_ROOT"] if os.path.isabs(path): return path else: return os.path.join(self.app.root_path, path) @property def thumbnail_directory(self): path = self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_ROOT"] if os.path.isabs(path): return path else: return os.path.join(self.app.root_path, path) @property def root_url(self): return self.app.config["THUMBNAIL_MEDIA_URL"] @property def media_names(self): # return self.image_dir_filenames return self._media_names(self.root_directory) @property def output_media_names(self): return self._media_names(self.output_dir) # return self.output_dir_filenames @staticmethod def _media_names(directory: Path): names = sorted([it.name for it in glob_img(directory)]) res = [] for name in names: path = os.path.join(directory, name) img = Image.open(path) res.append({"name": name, "height": img.height, "width": img.width, "ctime": os.path.getctime(path)}) return res @property def thumbnail_url(self): return self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_URL"] def get_thumbnail(self, directory: Path, original_filename: str, width, height, **options): storage = FilesystemStorageBackend(self.app) crop = options.get("crop", "fit") background = options.get("background") quality = options.get("quality", 90) original_path, original_filename = os.path.split(original_filename) original_filepath = os.path.join(directory, original_path, original_filename) image = Image.open(BytesIO(storage.read(original_filepath))) # keep ratio resize if width is not None: height = int(image.height * width / image.width) else: width = int(image.width * height / image.height) thumbnail_size = (width, height) thumbnail_filename = generate_filename( original_filename, aspect_to_string(thumbnail_size), crop, background, quality ) thumbnail_filepath = os.path.join( self.thumbnail_directory, original_path, thumbnail_filename ) thumbnail_url = os.path.join(self.thumbnail_url, original_path, thumbnail_filename) if storage.exists(thumbnail_filepath): return thumbnail_url, (width, height) try: image.load() except (IOError, OSError): self.app.logger.warning("Thumbnail not load image: %s", original_filepath) return thumbnail_url, (width, height) # get original image format options["format"] = options.get("format", image.format) image = self._create_thumbnail(image, thumbnail_size, crop, background=background) raw_data = self.get_raw_data(image, **options) storage.save(thumbnail_filepath, raw_data) return thumbnail_url, (width, height) def get_raw_data(self, image, **options): data = { "format": self._get_format(image, **options), "quality": options.get("quality", 90), } _file = BytesIO() image.save(_file, **data) return _file.getvalue() @staticmethod def colormode(image, colormode="RGB"): if colormode == "RGB" or colormode == "RGBA": if image.mode == "RGBA": return image if image.mode == "LA": return image.convert("RGBA") return image.convert(colormode) if colormode == "GRAY": return image.convert("L") return image.convert(colormode) @staticmethod def background(original_image, color=0xFF): size = (max(original_image.size),) * 2 image = Image.new("L", size, color) image.paste( original_image, tuple(map(lambda x: (x[0] - x[1]) / 2, zip(size, original_image.size))), ) return image def _get_format(self, image, **options): if options.get("format"): return options.get("format") if image.format: return image.format return self.app.config["THUMBNAIL_DEFAULT_FORMAT"] def _create_thumbnail(self, image, size, crop="fit", background=None): try: resample = Image.Resampling.LANCZOS except AttributeError: # pylint: disable=raise-missing-from resample = Image.ANTIALIAS if crop == "fit": image = ImageOps.fit(image, size, resample) else: image = image.copy() image.thumbnail(size, resample=resample) if background is not None: image = self.background(image) image = self.colormode(image) return image