IOPaint/lama_cleaner/file_manager/file_manager.py
2023-01-08 21:59:26 +08:00

254 lines
8.6 KiB
Python

# Copy from https://github.com/silentsokolov/flask-thumbnails/blob/master/flask_thumbnails/thumbnail.py
import os
from datetime import datetime
import cv2
import time
from io import BytesIO
from pathlib import Path
import numpy as np
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from PIL import Image, ImageOps, PngImagePlugin
from loguru import logger
LARGE_ENOUGH_NUMBER = 100
PngImagePlugin.MAX_TEXT_CHUNK = LARGE_ENOUGH_NUMBER * (1024 ** 2)
from .storage_backends import FilesystemStorageBackend
from .utils import aspect_to_string, generate_filename, glob_img
class FileManager(FileSystemEventHandler):
def __init__(self, app=None):
self.app = app
self._default_root_directory = "media"
self._default_thumbnail_directory = "media"
self._default_root_url = "/"
self._default_thumbnail_root_url = "/"
self._default_format = "JPEG"
self.output_dir: Path = None
if app is not None:
self.init_app(app)
self.image_dir_filenames = []
self.output_dir_filenames = []
self.image_dir_observer = None
self.output_dir_observer = None
self.modified_time = {
"image": datetime.utcnow(),
"output": datetime.utcnow(),
}
def start(self):
self.image_dir_filenames = self._media_names(self.root_directory)
self.output_dir_filenames = self._media_names(self.output_dir)
logger.info(f"Start watching image directory: {self.root_directory}")
self.image_dir_observer = Observer()
self.image_dir_observer.schedule(self, self.root_directory, recursive=False)
self.image_dir_observer.start()
logger.info(f"Start watching output directory: {self.output_dir}")
self.output_dir_observer = Observer()
self.output_dir_observer.schedule(self, self.output_dir, recursive=False)
self.output_dir_observer.start()
def on_modified(self, event):
if not os.path.isdir(event.src_path):
return
if event.src_path == str(self.root_directory):
logger.info(f"Image directory {event.src_path} modified")
self.image_dir_filenames = self._media_names(self.root_directory)
self.modified_time['image'] = datetime.utcnow()
elif event.src_path == str(self.output_dir):
logger.info(f"Output directory {event.src_path} modified")
self.output_dir_filenames = self._media_names(self.output_dir)
self.modified_time['output'] = datetime.utcnow()
def init_app(self, app):
if self.app is None:
self.app = app
app.thumbnail_instance = self
if not hasattr(app, "extensions"):
app.extensions = {}
if "thumbnail" in app.extensions:
raise RuntimeError("Flask-thumbnail extension already initialized")
app.extensions["thumbnail"] = self
app.config.setdefault("THUMBNAIL_MEDIA_ROOT", self._default_root_directory)
app.config.setdefault("THUMBNAIL_MEDIA_THUMBNAIL_ROOT", self._default_thumbnail_directory)
app.config.setdefault("THUMBNAIL_MEDIA_URL", self._default_root_url)
app.config.setdefault("THUMBNAIL_MEDIA_THUMBNAIL_URL", self._default_thumbnail_root_url)
app.config.setdefault("THUMBNAIL_DEFAULT_FORMAT", self._default_format)
def save_to_output_directory(self, image: np.ndarray, filename: str):
fp = Path(filename)
new_name = fp.stem + f"_{int(time.time())}" + fp.suffix
if image.shape[2] == 3:
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
elif image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_RGBA2BGRA)
cv2.imwrite(str(self.output_dir / new_name), image)
@property
def root_directory(self):
path = self.app.config["THUMBNAIL_MEDIA_ROOT"]
if os.path.isabs(path):
return path
else:
return os.path.join(self.app.root_path, path)
@property
def thumbnail_directory(self):
path = self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_ROOT"]
if os.path.isabs(path):
return path
else:
return os.path.join(self.app.root_path, path)
@property
def root_url(self):
return self.app.config["THUMBNAIL_MEDIA_URL"]
@property
def media_names(self):
# return self.image_dir_filenames
return self._media_names(self.root_directory)
@property
def output_media_names(self):
return self._media_names(self.output_dir)
# return self.output_dir_filenames
@staticmethod
def _media_names(directory: Path):
names = sorted([it.name for it in glob_img(directory)])
res = []
for name in names:
path = os.path.join(directory, name)
img = Image.open(path)
res.append({"name": name, "height": img.height, "width": img.width, "ctime": os.path.getctime(path)})
return res
@property
def thumbnail_url(self):
return self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_URL"]
def get_thumbnail(self, directory: Path, original_filename: str, width, height, **options):
storage = FilesystemStorageBackend(self.app)
crop = options.get("crop", "fit")
background = options.get("background")
quality = options.get("quality", 90)
original_path, original_filename = os.path.split(original_filename)
original_filepath = os.path.join(directory, original_path, original_filename)
image = Image.open(BytesIO(storage.read(original_filepath)))
# keep ratio resize
if width is not None:
height = int(image.height * width / image.width)
else:
width = int(image.width * height / image.height)
thumbnail_size = (width, height)
thumbnail_filename = generate_filename(
original_filename, aspect_to_string(thumbnail_size), crop, background, quality
)
thumbnail_filepath = os.path.join(
self.thumbnail_directory, original_path, thumbnail_filename
)
thumbnail_url = os.path.join(self.thumbnail_url, original_path, thumbnail_filename)
if storage.exists(thumbnail_filepath):
return thumbnail_url, (width, height)
try:
image.load()
except (IOError, OSError):
self.app.logger.warning("Thumbnail not load image: %s", original_filepath)
return thumbnail_url, (width, height)
# get original image format
options["format"] = options.get("format", image.format)
image = self._create_thumbnail(image, thumbnail_size, crop, background=background)
raw_data = self.get_raw_data(image, **options)
storage.save(thumbnail_filepath, raw_data)
return thumbnail_url, (width, height)
def get_raw_data(self, image, **options):
data = {
"format": self._get_format(image, **options),
"quality": options.get("quality", 90),
}
_file = BytesIO()
image.save(_file, **data)
return _file.getvalue()
@staticmethod
def colormode(image, colormode="RGB"):
if colormode == "RGB" or colormode == "RGBA":
if image.mode == "RGBA":
return image
if image.mode == "LA":
return image.convert("RGBA")
return image.convert(colormode)
if colormode == "GRAY":
return image.convert("L")
return image.convert(colormode)
@staticmethod
def background(original_image, color=0xFF):
size = (max(original_image.size),) * 2
image = Image.new("L", size, color)
image.paste(
original_image,
tuple(map(lambda x: (x[0] - x[1]) / 2, zip(size, original_image.size))),
)
return image
def _get_format(self, image, **options):
if options.get("format"):
return options.get("format")
if image.format:
return image.format
return self.app.config["THUMBNAIL_DEFAULT_FORMAT"]
def _create_thumbnail(self, image, size, crop="fit", background=None):
try:
resample = Image.Resampling.LANCZOS
except AttributeError: # pylint: disable=raise-missing-from
resample = Image.ANTIALIAS
if crop == "fit":
image = ImageOps.fit(image, size, resample)
else:
image = image.copy()
image.thumbnail(size, resample=resample)
if background is not None:
image = self.background(image)
image = self.colormode(image)
return image