193 lines
6.9 KiB
Python
193 lines
6.9 KiB
Python
from typing import List, Dict
|
|
|
|
import torch
|
|
from loguru import logger
|
|
import numpy as np
|
|
|
|
from iopaint.download import scan_models
|
|
from iopaint.helper import switch_mps_device
|
|
from iopaint.model import models, ControlNet, SD, SDXL
|
|
from iopaint.model.utils import torch_gc, is_local_files_only
|
|
from iopaint.model_info import ModelInfo, ModelType
|
|
from iopaint.schema import InpaintRequest
|
|
|
|
|
|
class ModelManager:
|
|
def __init__(self, name: str, device: torch.device, **kwargs):
|
|
self.name = name
|
|
self.device = device
|
|
self.kwargs = kwargs
|
|
self.available_models: Dict[str, ModelInfo] = {}
|
|
self.scan_models()
|
|
|
|
self.enable_controlnet = kwargs.get("enable_controlnet", False)
|
|
controlnet_method = kwargs.get("controlnet_method", None)
|
|
if (
|
|
controlnet_method is None
|
|
and name in self.available_models
|
|
and self.available_models[name].support_controlnet
|
|
):
|
|
controlnet_method = self.available_models[name].controlnets[0]
|
|
self.controlnet_method = controlnet_method
|
|
self.model = self.init_model(name, device, **kwargs)
|
|
|
|
@property
|
|
def current_model(self) -> ModelInfo:
|
|
return self.available_models[self.name]
|
|
|
|
def init_model(self, name: str, device, **kwargs):
|
|
logger.info(f"Loading model: {name}")
|
|
if name not in self.available_models:
|
|
raise NotImplementedError(
|
|
f"Unsupported model: {name}. Available models: {list(self.available_models.keys())}"
|
|
)
|
|
|
|
model_info = self.available_models[name]
|
|
kwargs = {
|
|
**kwargs,
|
|
"model_info": model_info,
|
|
"enable_controlnet": self.enable_controlnet,
|
|
"controlnet_method": self.controlnet_method,
|
|
}
|
|
|
|
if model_info.support_controlnet and self.enable_controlnet:
|
|
return ControlNet(device, **kwargs)
|
|
elif model_info.name in models:
|
|
return models[name](device, **kwargs)
|
|
else:
|
|
if model_info.model_type in [
|
|
ModelType.DIFFUSERS_SD_INPAINT,
|
|
ModelType.DIFFUSERS_SD,
|
|
]:
|
|
return SD(device, **kwargs)
|
|
|
|
if model_info.model_type in [
|
|
ModelType.DIFFUSERS_SDXL_INPAINT,
|
|
ModelType.DIFFUSERS_SDXL,
|
|
]:
|
|
return SDXL(device, **kwargs)
|
|
|
|
raise NotImplementedError(f"Unsupported model: {name}")
|
|
|
|
@torch.inference_mode()
|
|
def __call__(self, image, mask, config: InpaintRequest):
|
|
"""
|
|
|
|
Args:
|
|
image: [H, W, C] RGB
|
|
mask: [H, W, 1] 255 means area to repaint
|
|
config:
|
|
|
|
Returns:
|
|
BGR image
|
|
"""
|
|
self.switch_controlnet_method(config)
|
|
self.enable_disable_freeu(config)
|
|
self.enable_disable_lcm_lora(config)
|
|
return self.model(image, mask, config).astype(np.uint8)
|
|
|
|
def scan_models(self) -> List[ModelInfo]:
|
|
available_models = scan_models()
|
|
self.available_models = {it.name: it for it in available_models}
|
|
return available_models
|
|
|
|
def switch(self, new_name: str):
|
|
if new_name == self.name:
|
|
return
|
|
|
|
old_name = self.name
|
|
old_controlnet_method = self.controlnet_method
|
|
self.name = new_name
|
|
|
|
if (
|
|
self.available_models[new_name].support_controlnet
|
|
and self.controlnet_method
|
|
not in self.available_models[new_name].controlnets
|
|
):
|
|
self.controlnet_method = self.available_models[new_name].controlnets[0]
|
|
try:
|
|
# TODO: enable/disable controlnet without reload model
|
|
del self.model
|
|
torch_gc()
|
|
|
|
self.model = self.init_model(
|
|
new_name, switch_mps_device(new_name, self.device), **self.kwargs
|
|
)
|
|
except Exception as e:
|
|
self.name = old_name
|
|
self.controlnet_method = old_controlnet_method
|
|
logger.info(f"Switch model from {old_name} to {new_name} failed, rollback")
|
|
self.model = self.init_model(
|
|
old_name, switch_mps_device(old_name, self.device), **self.kwargs
|
|
)
|
|
raise e
|
|
|
|
def switch_controlnet_method(self, config):
|
|
if not self.available_models[self.name].support_controlnet:
|
|
return
|
|
|
|
if (
|
|
self.enable_controlnet
|
|
and config.controlnet_method
|
|
and self.controlnet_method != config.controlnet_method
|
|
):
|
|
old_controlnet_method = self.controlnet_method
|
|
self.controlnet_method = config.controlnet_method
|
|
self.model.switch_controlnet_method(config.controlnet_method)
|
|
logger.info(
|
|
f"Switch Controlnet method from {old_controlnet_method} to {config.controlnet_method}"
|
|
)
|
|
elif self.enable_controlnet != config.enable_controlnet:
|
|
self.enable_controlnet = config.enable_controlnet
|
|
self.controlnet_method = config.controlnet_method
|
|
|
|
pipe_components = {
|
|
"vae": self.model.model.vae,
|
|
"text_encoder": self.model.model.text_encoder,
|
|
"unet": self.model.model.unet,
|
|
}
|
|
if hasattr(self.model.model, "text_encoder_2"):
|
|
pipe_components["text_encoder_2"] = self.model.model.text_encoder_2
|
|
|
|
self.model = self.init_model(
|
|
self.name,
|
|
switch_mps_device(self.name, self.device),
|
|
pipe_components=pipe_components,
|
|
**self.kwargs,
|
|
)
|
|
if not config.enable_controlnet:
|
|
logger.info(f"Disable controlnet")
|
|
else:
|
|
logger.info(f"Enable controlnet: {config.controlnet_method}")
|
|
|
|
def enable_disable_freeu(self, config: InpaintRequest):
|
|
if str(self.model.device) == "mps":
|
|
return
|
|
|
|
if self.available_models[self.name].support_freeu:
|
|
if config.sd_freeu:
|
|
freeu_config = config.sd_freeu_config
|
|
self.model.model.enable_freeu(
|
|
s1=freeu_config.s1,
|
|
s2=freeu_config.s2,
|
|
b1=freeu_config.b1,
|
|
b2=freeu_config.b2,
|
|
)
|
|
else:
|
|
self.model.model.disable_freeu()
|
|
|
|
def enable_disable_lcm_lora(self, config: InpaintRequest):
|
|
if self.available_models[self.name].support_lcm_lora:
|
|
# TODO: change this if load other lora is supported
|
|
lcm_lora_loaded = bool(self.model.model.get_list_adapters())
|
|
if config.sd_lcm_lora:
|
|
if not lcm_lora_loaded:
|
|
self.model.model.load_lora_weights(
|
|
self.model.lcm_lora_id,
|
|
weight_name="pytorch_lora_weights.safetensors",
|
|
local_files_only=is_local_files_only(),
|
|
)
|
|
else:
|
|
if lcm_lora_loaded:
|
|
self.model.model.disable_lora()
|