2022-12-31 14:07:08 +01:00
|
|
|
# Copy from https://github.com/silentsokolov/flask-thumbnails/blob/master/flask_thumbnails/thumbnail.py
|
|
|
|
import os
|
2023-01-08 13:53:55 +01:00
|
|
|
from datetime import datetime
|
|
|
|
|
2023-01-05 15:07:39 +01:00
|
|
|
import cv2
|
|
|
|
import time
|
2022-12-31 14:07:08 +01:00
|
|
|
from io import BytesIO
|
2023-01-05 15:07:39 +01:00
|
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
2023-01-08 13:53:55 +01:00
|
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from watchdog.observers import Observer
|
2022-12-31 14:07:08 +01:00
|
|
|
|
|
|
|
from PIL import Image, ImageOps, PngImagePlugin
|
2023-01-08 13:53:55 +01:00
|
|
|
from loguru import logger
|
2023-01-05 15:07:39 +01:00
|
|
|
|
2022-12-31 14:07:08 +01:00
|
|
|
LARGE_ENOUGH_NUMBER = 100
|
2023-02-19 07:31:00 +01:00
|
|
|
PngImagePlugin.MAX_TEXT_CHUNK = LARGE_ENOUGH_NUMBER * (1024**2)
|
2022-12-31 14:07:08 +01:00
|
|
|
from .storage_backends import FilesystemStorageBackend
|
|
|
|
from .utils import aspect_to_string, generate_filename, glob_img
|
|
|
|
|
|
|
|
|
2023-01-08 13:53:55 +01:00
|
|
|
class FileManager(FileSystemEventHandler):
|
2022-12-31 14:07:08 +01:00
|
|
|
def __init__(self, app=None):
|
|
|
|
self.app = app
|
|
|
|
self._default_root_directory = "media"
|
|
|
|
self._default_thumbnail_directory = "media"
|
|
|
|
self._default_root_url = "/"
|
|
|
|
self._default_thumbnail_root_url = "/"
|
|
|
|
self._default_format = "JPEG"
|
2023-01-05 15:07:39 +01:00
|
|
|
self.output_dir: Path = None
|
2022-12-31 14:07:08 +01:00
|
|
|
|
|
|
|
if app is not None:
|
|
|
|
self.init_app(app)
|
|
|
|
|
2023-01-08 13:53:55 +01:00
|
|
|
self.image_dir_filenames = []
|
|
|
|
self.output_dir_filenames = []
|
|
|
|
|
|
|
|
self.image_dir_observer = None
|
|
|
|
self.output_dir_observer = None
|
|
|
|
|
|
|
|
self.modified_time = {
|
|
|
|
"image": datetime.utcnow(),
|
|
|
|
"output": datetime.utcnow(),
|
|
|
|
}
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self.image_dir_filenames = self._media_names(self.root_directory)
|
|
|
|
self.output_dir_filenames = self._media_names(self.output_dir)
|
|
|
|
|
|
|
|
logger.info(f"Start watching image directory: {self.root_directory}")
|
|
|
|
self.image_dir_observer = Observer()
|
|
|
|
self.image_dir_observer.schedule(self, self.root_directory, recursive=False)
|
|
|
|
self.image_dir_observer.start()
|
|
|
|
|
|
|
|
logger.info(f"Start watching output directory: {self.output_dir}")
|
|
|
|
self.output_dir_observer = Observer()
|
|
|
|
self.output_dir_observer.schedule(self, self.output_dir, recursive=False)
|
|
|
|
self.output_dir_observer.start()
|
|
|
|
|
|
|
|
def on_modified(self, event):
|
|
|
|
if not os.path.isdir(event.src_path):
|
|
|
|
return
|
|
|
|
if event.src_path == str(self.root_directory):
|
|
|
|
logger.info(f"Image directory {event.src_path} modified")
|
|
|
|
self.image_dir_filenames = self._media_names(self.root_directory)
|
2023-02-19 07:31:00 +01:00
|
|
|
self.modified_time["image"] = datetime.utcnow()
|
2023-01-08 13:53:55 +01:00
|
|
|
elif event.src_path == str(self.output_dir):
|
|
|
|
logger.info(f"Output directory {event.src_path} modified")
|
|
|
|
self.output_dir_filenames = self._media_names(self.output_dir)
|
2023-02-19 07:31:00 +01:00
|
|
|
self.modified_time["output"] = datetime.utcnow()
|
2023-01-08 13:53:55 +01:00
|
|
|
|
2022-12-31 14:07:08 +01:00
|
|
|
def init_app(self, app):
|
|
|
|
if self.app is None:
|
|
|
|
self.app = app
|
|
|
|
app.thumbnail_instance = self
|
|
|
|
|
|
|
|
if not hasattr(app, "extensions"):
|
|
|
|
app.extensions = {}
|
|
|
|
|
|
|
|
if "thumbnail" in app.extensions:
|
|
|
|
raise RuntimeError("Flask-thumbnail extension already initialized")
|
|
|
|
|
|
|
|
app.extensions["thumbnail"] = self
|
|
|
|
|
|
|
|
app.config.setdefault("THUMBNAIL_MEDIA_ROOT", self._default_root_directory)
|
2023-02-19 07:31:00 +01:00
|
|
|
app.config.setdefault(
|
|
|
|
"THUMBNAIL_MEDIA_THUMBNAIL_ROOT", self._default_thumbnail_directory
|
|
|
|
)
|
2022-12-31 14:07:08 +01:00
|
|
|
app.config.setdefault("THUMBNAIL_MEDIA_URL", self._default_root_url)
|
2023-02-19 07:31:00 +01:00
|
|
|
app.config.setdefault(
|
|
|
|
"THUMBNAIL_MEDIA_THUMBNAIL_URL", self._default_thumbnail_root_url
|
|
|
|
)
|
2022-12-31 14:07:08 +01:00
|
|
|
app.config.setdefault("THUMBNAIL_DEFAULT_FORMAT", self._default_format)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def root_directory(self):
|
|
|
|
path = self.app.config["THUMBNAIL_MEDIA_ROOT"]
|
|
|
|
|
|
|
|
if os.path.isabs(path):
|
|
|
|
return path
|
|
|
|
else:
|
|
|
|
return os.path.join(self.app.root_path, path)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def thumbnail_directory(self):
|
|
|
|
path = self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_ROOT"]
|
|
|
|
|
|
|
|
if os.path.isabs(path):
|
|
|
|
return path
|
|
|
|
else:
|
|
|
|
return os.path.join(self.app.root_path, path)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def root_url(self):
|
|
|
|
return self.app.config["THUMBNAIL_MEDIA_URL"]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def media_names(self):
|
2023-01-08 14:59:26 +01:00
|
|
|
# return self.image_dir_filenames
|
|
|
|
return self._media_names(self.root_directory)
|
2023-01-07 13:51:05 +01:00
|
|
|
|
|
|
|
@property
|
|
|
|
def output_media_names(self):
|
2023-01-08 14:59:26 +01:00
|
|
|
return self._media_names(self.output_dir)
|
|
|
|
# return self.output_dir_filenames
|
2023-01-07 13:51:05 +01:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _media_names(directory: Path):
|
|
|
|
names = sorted([it.name for it in glob_img(directory)])
|
2022-12-31 14:07:08 +01:00
|
|
|
res = []
|
|
|
|
for name in names:
|
2023-01-07 13:51:05 +01:00
|
|
|
path = os.path.join(directory, name)
|
2023-01-07 01:52:11 +01:00
|
|
|
img = Image.open(path)
|
2023-02-19 07:31:00 +01:00
|
|
|
res.append(
|
|
|
|
{
|
|
|
|
"name": name,
|
|
|
|
"height": img.height,
|
|
|
|
"width": img.width,
|
|
|
|
"ctime": os.path.getctime(path),
|
|
|
|
}
|
|
|
|
)
|
2022-12-31 14:07:08 +01:00
|
|
|
return res
|
|
|
|
|
|
|
|
@property
|
|
|
|
def thumbnail_url(self):
|
|
|
|
return self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_URL"]
|
|
|
|
|
2023-02-19 07:31:00 +01:00
|
|
|
def get_thumbnail(
|
|
|
|
self, directory: Path, original_filename: str, width, height, **options
|
|
|
|
):
|
2022-12-31 14:07:08 +01:00
|
|
|
storage = FilesystemStorageBackend(self.app)
|
|
|
|
crop = options.get("crop", "fit")
|
|
|
|
background = options.get("background")
|
|
|
|
quality = options.get("quality", 90)
|
|
|
|
|
|
|
|
original_path, original_filename = os.path.split(original_filename)
|
2023-01-07 13:51:05 +01:00
|
|
|
original_filepath = os.path.join(directory, original_path, original_filename)
|
2022-12-31 14:07:08 +01:00
|
|
|
image = Image.open(BytesIO(storage.read(original_filepath)))
|
|
|
|
|
|
|
|
# keep ratio resize
|
|
|
|
if width is not None:
|
|
|
|
height = int(image.height * width / image.width)
|
|
|
|
else:
|
|
|
|
width = int(image.width * height / image.height)
|
|
|
|
|
|
|
|
thumbnail_size = (width, height)
|
|
|
|
|
|
|
|
thumbnail_filename = generate_filename(
|
2023-02-19 07:31:00 +01:00
|
|
|
original_filename,
|
|
|
|
aspect_to_string(thumbnail_size),
|
|
|
|
crop,
|
|
|
|
background,
|
|
|
|
quality,
|
2022-12-31 14:07:08 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
thumbnail_filepath = os.path.join(
|
|
|
|
self.thumbnail_directory, original_path, thumbnail_filename
|
|
|
|
)
|
2023-02-19 07:31:00 +01:00
|
|
|
thumbnail_url = os.path.join(
|
|
|
|
self.thumbnail_url, original_path, thumbnail_filename
|
|
|
|
)
|
2022-12-31 14:07:08 +01:00
|
|
|
|
|
|
|
if storage.exists(thumbnail_filepath):
|
|
|
|
return thumbnail_url, (width, height)
|
|
|
|
|
|
|
|
try:
|
|
|
|
image.load()
|
|
|
|
except (IOError, OSError):
|
|
|
|
self.app.logger.warning("Thumbnail not load image: %s", original_filepath)
|
|
|
|
return thumbnail_url, (width, height)
|
|
|
|
|
|
|
|
# get original image format
|
|
|
|
options["format"] = options.get("format", image.format)
|
|
|
|
|
2023-02-19 07:31:00 +01:00
|
|
|
image = self._create_thumbnail(
|
|
|
|
image, thumbnail_size, crop, background=background
|
|
|
|
)
|
2022-12-31 14:07:08 +01:00
|
|
|
|
|
|
|
raw_data = self.get_raw_data(image, **options)
|
|
|
|
storage.save(thumbnail_filepath, raw_data)
|
|
|
|
|
|
|
|
return thumbnail_url, (width, height)
|
|
|
|
|
|
|
|
def get_raw_data(self, image, **options):
|
|
|
|
data = {
|
|
|
|
"format": self._get_format(image, **options),
|
|
|
|
"quality": options.get("quality", 90),
|
|
|
|
}
|
|
|
|
|
|
|
|
_file = BytesIO()
|
|
|
|
image.save(_file, **data)
|
|
|
|
return _file.getvalue()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def colormode(image, colormode="RGB"):
|
|
|
|
if colormode == "RGB" or colormode == "RGBA":
|
|
|
|
if image.mode == "RGBA":
|
|
|
|
return image
|
|
|
|
if image.mode == "LA":
|
|
|
|
return image.convert("RGBA")
|
|
|
|
return image.convert(colormode)
|
|
|
|
|
|
|
|
if colormode == "GRAY":
|
|
|
|
return image.convert("L")
|
|
|
|
|
|
|
|
return image.convert(colormode)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def background(original_image, color=0xFF):
|
|
|
|
size = (max(original_image.size),) * 2
|
|
|
|
image = Image.new("L", size, color)
|
|
|
|
image.paste(
|
|
|
|
original_image,
|
|
|
|
tuple(map(lambda x: (x[0] - x[1]) / 2, zip(size, original_image.size))),
|
|
|
|
)
|
|
|
|
|
|
|
|
return image
|
|
|
|
|
|
|
|
def _get_format(self, image, **options):
|
|
|
|
if options.get("format"):
|
|
|
|
return options.get("format")
|
|
|
|
if image.format:
|
|
|
|
return image.format
|
|
|
|
|
|
|
|
return self.app.config["THUMBNAIL_DEFAULT_FORMAT"]
|
|
|
|
|
|
|
|
def _create_thumbnail(self, image, size, crop="fit", background=None):
|
|
|
|
try:
|
|
|
|
resample = Image.Resampling.LANCZOS
|
|
|
|
except AttributeError: # pylint: disable=raise-missing-from
|
|
|
|
resample = Image.ANTIALIAS
|
|
|
|
|
|
|
|
if crop == "fit":
|
|
|
|
image = ImageOps.fit(image, size, resample)
|
|
|
|
else:
|
|
|
|
image = image.copy()
|
|
|
|
image.thumbnail(size, resample=resample)
|
|
|
|
|
|
|
|
if background is not None:
|
|
|
|
image = self.background(image)
|
|
|
|
|
|
|
|
image = self.colormode(image)
|
|
|
|
|
|
|
|
return image
|