IOPaint/lama_cleaner/model_manager.py

154 lines
5.6 KiB
Python
Raw Normal View History

2023-12-01 03:15:35 +01:00
from typing import List, Dict
2022-11-14 11:19:50 +01:00
2023-12-01 03:15:35 +01:00
import torch
2023-05-13 07:45:27 +02:00
from loguru import logger
2023-12-19 06:16:30 +01:00
from lama_cleaner.const import DEFAULT_SD_CONTROLNET_METHOD
2023-12-01 03:15:35 +01:00
from lama_cleaner.download import scan_models
2023-02-11 06:30:09 +01:00
from lama_cleaner.helper import switch_mps_device
2023-12-01 03:15:35 +01:00
from lama_cleaner.model import models, ControlNet, SD, SDXL
2023-05-11 15:51:58 +02:00
from lama_cleaner.model.utils import torch_gc
2023-12-01 03:15:35 +01:00
from lama_cleaner.schema import Config, ModelInfo, ModelType
2022-04-15 18:11:51 +02:00
2022-07-14 10:49:03 +02:00
class ModelManager:
2022-11-14 11:19:50 +01:00
def __init__(self, name: str, device: torch.device, **kwargs):
2022-04-15 18:11:51 +02:00
self.name = name
self.device = device
2022-09-15 16:21:27 +02:00
self.kwargs = kwargs
2023-12-01 03:15:35 +01:00
self.available_models: Dict[str, ModelInfo] = {}
self.scan_models()
2023-12-19 06:16:30 +01:00
self.sd_controlnet = kwargs.get("sd_controlnet", False)
self.sd_controlnet_method = kwargs.get(
"sd_controlnet_method", DEFAULT_SD_CONTROLNET_METHOD
)
2022-09-15 16:21:27 +02:00
self.model = self.init_model(name, device, **kwargs)
2022-04-15 18:11:51 +02:00
2022-09-15 16:21:27 +02:00
def init_model(self, name: str, device, **kwargs):
2023-12-01 03:15:35 +01:00
for old_name, model_cls in models.items():
if name == old_name and hasattr(model_cls, "model_id_or_path"):
name = model_cls.model_id_or_path
if name not in self.available_models:
raise NotImplementedError(f"Unsupported model: {name}")
model_info = self.available_models[name]
2023-12-19 06:16:30 +01:00
kwargs = {
**kwargs,
"model_info": model_info,
"sd_controlnet": self.sd_controlnet,
"sd_controlnet_method": self.sd_controlnet_method,
}
2023-12-01 03:15:35 +01:00
if model_info.model_type in [ModelType.INPAINT, ModelType.DIFFUSERS_OTHER]:
return models[name](device, **kwargs)
2023-12-19 06:16:30 +01:00
if self.sd_controlnet:
2023-12-15 05:40:29 +01:00
return ControlNet(device, **kwargs)
2022-04-15 18:11:51 +02:00
else:
2023-12-01 03:15:35 +01:00
if model_info.model_type in [
2023-12-11 15:28:07 +01:00
ModelType.DIFFUSERS_SD_INPAINT,
2023-12-01 03:15:35 +01:00
ModelType.DIFFUSERS_SD,
]:
2023-12-15 05:40:29 +01:00
return SD(device, **kwargs)
2022-04-15 18:11:51 +02:00
2023-12-11 15:28:07 +01:00
if model_info.model_type in [
ModelType.DIFFUSERS_SDXL_INPAINT,
ModelType.DIFFUSERS_SDXL,
]:
2023-12-15 05:40:29 +01:00
return SDXL(device, **kwargs)
2023-12-01 03:15:35 +01:00
raise NotImplementedError(f"Unsupported model: {name}")
2022-04-17 17:31:12 +02:00
2022-04-15 18:11:51 +02:00
def __call__(self, image, mask, config: Config):
2023-12-19 06:16:30 +01:00
self.switch_controlnet_method(config)
2023-11-14 15:04:16 +01:00
self.enable_disable_freeu(config)
2023-11-15 01:50:35 +01:00
self.enable_disable_lcm_lora(config)
2022-04-15 18:11:51 +02:00
return self.model(image, mask, config)
2023-12-01 03:15:35 +01:00
def scan_models(self) -> List[ModelInfo]:
available_models = scan_models()
self.available_models = {it.name: it for it in available_models}
return available_models
def switch(self, new_name: str):
2022-04-15 18:11:51 +02:00
if new_name == self.name:
return
2023-12-01 03:15:35 +01:00
old_name = self.name
2023-12-19 06:16:30 +01:00
old_sd_controlnet_method = self.sd_controlnet_method
2023-12-01 03:15:35 +01:00
self.name = new_name
2023-12-19 06:16:30 +01:00
if (
self.available_models[new_name].support_controlnet
and self.sd_controlnet_method
not in self.available_models[new_name].controlnets
):
self.sd_controlnet_method = self.available_models[new_name].controlnets[0]
2022-04-15 18:11:51 +02:00
try:
2023-12-19 06:16:30 +01:00
del self.model
torch_gc()
2023-02-11 06:30:09 +01:00
self.model = self.init_model(
new_name, switch_mps_device(new_name, self.device), **self.kwargs
)
2023-12-01 03:15:35 +01:00
except Exception as e:
self.name = old_name
2023-12-19 06:16:30 +01:00
self.sd_controlnet_method = old_sd_controlnet_method
logger.info(f"Switch model from {old_name} to {new_name} failed, rollback")
self.model = self.init_model(
old_name, switch_mps_device(old_name, self.device), **self.kwargs
)
2022-04-15 18:11:51 +02:00
raise e
2023-05-11 15:51:58 +02:00
2023-12-19 06:16:30 +01:00
def switch_controlnet_method(self, config):
2023-12-11 15:28:07 +01:00
if not self.available_models[self.name].support_controlnet:
2023-12-01 03:15:35 +01:00
return
2023-05-11 15:51:58 +02:00
2023-12-22 07:00:30 +01:00
if (
self.sd_controlnet
and config.controlnet_method
and self.sd_controlnet_method != config.controlnet_method
2023-12-19 06:16:30 +01:00
):
old_sd_controlnet_method = self.sd_controlnet_method
2023-12-22 07:00:30 +01:00
self.sd_controlnet_method = config.controlnet_method
self.model.switch_controlnet_method(config.controlnet_method)
logger.info(
f"Switch Controlnet method from {old_sd_controlnet_method} to {config.controlnet_method}"
)
elif self.sd_controlnet != config.controlnet_enabled:
2023-12-19 06:16:30 +01:00
self.sd_controlnet = config.controlnet_enabled
self.sd_controlnet_method = config.controlnet_method
2023-05-11 15:51:58 +02:00
2023-12-19 06:16:30 +01:00
self.model = self.init_model(
self.name, switch_mps_device(self.name, self.device), **self.kwargs
)
if not config.controlnet_enabled:
logger.info(f"Disable controlnet")
else:
logger.info(f"Enable controlnet: {config.controlnet_method}")
2023-11-14 15:04:16 +01:00
def enable_disable_freeu(self, config: Config):
if str(self.model.device) == "mps":
return
2023-12-11 15:28:07 +01:00
if self.available_models[self.name].support_freeu:
2023-11-14 15:04:16 +01:00
if config.sd_freeu:
freeu_config = config.sd_freeu_config
self.model.model.enable_freeu(
s1=freeu_config.s1,
s2=freeu_config.s2,
b1=freeu_config.b1,
b2=freeu_config.b2,
)
else:
self.model.model.disable_freeu()
2023-11-15 01:50:35 +01:00
def enable_disable_lcm_lora(self, config: Config):
2023-12-11 15:28:07 +01:00
if self.available_models[self.name].support_lcm_lora:
2023-11-15 01:50:35 +01:00
if config.sd_lcm_lora:
2023-12-19 06:16:30 +01:00
if not self.model.model.get_list_adapters():
2023-11-15 01:50:35 +01:00
self.model.model.load_lora_weights(self.model.lcm_lora_id)
else:
self.model.model.disable_lora()