2022-12-16 16:14:15 +01:00
|
|
|
import gc
|
2023-12-01 03:15:35 +01:00
|
|
|
from typing import List, Dict
|
2022-11-14 11:19:50 +01:00
|
|
|
|
2023-12-01 03:15:35 +01:00
|
|
|
import torch
|
2023-05-13 07:45:27 +02:00
|
|
|
from loguru import logger
|
|
|
|
|
2023-12-01 03:15:35 +01:00
|
|
|
from lama_cleaner.download import scan_models
|
2023-02-11 06:30:09 +01:00
|
|
|
from lama_cleaner.helper import switch_mps_device
|
2023-12-01 03:15:35 +01:00
|
|
|
from lama_cleaner.model import models, ControlNet, SD, SDXL
|
2023-05-11 15:51:58 +02:00
|
|
|
from lama_cleaner.model.utils import torch_gc
|
2023-12-01 03:15:35 +01:00
|
|
|
from lama_cleaner.schema import Config, ModelInfo, ModelType
|
2022-04-15 18:11:51 +02:00
|
|
|
|
|
|
|
|
2022-07-14 10:49:03 +02:00
|
|
|
class ModelManager:
|
2022-11-14 11:19:50 +01:00
|
|
|
def __init__(self, name: str, device: torch.device, **kwargs):
|
2022-04-15 18:11:51 +02:00
|
|
|
self.name = name
|
|
|
|
self.device = device
|
2022-09-15 16:21:27 +02:00
|
|
|
self.kwargs = kwargs
|
2023-12-01 03:15:35 +01:00
|
|
|
self.available_models: Dict[str, ModelInfo] = {}
|
|
|
|
self.scan_models()
|
2022-09-15 16:21:27 +02:00
|
|
|
self.model = self.init_model(name, device, **kwargs)
|
2022-04-15 18:11:51 +02:00
|
|
|
|
2022-09-15 16:21:27 +02:00
|
|
|
def init_model(self, name: str, device, **kwargs):
|
2023-12-01 03:15:35 +01:00
|
|
|
for old_name, model_cls in models.items():
|
|
|
|
if name == old_name and hasattr(model_cls, "model_id_or_path"):
|
|
|
|
name = model_cls.model_id_or_path
|
|
|
|
if name not in self.available_models:
|
|
|
|
raise NotImplementedError(f"Unsupported model: {name}")
|
|
|
|
|
|
|
|
sd_controlnet_enabled = kwargs.get("sd_controlnet", False)
|
|
|
|
model_info = self.available_models[name]
|
|
|
|
if model_info.model_type in [ModelType.INPAINT, ModelType.DIFFUSERS_OTHER]:
|
|
|
|
return models[name](device, **kwargs)
|
|
|
|
|
|
|
|
if sd_controlnet_enabled:
|
|
|
|
return ControlNet(device, **{**kwargs, "model_info": model_info})
|
2022-04-15 18:11:51 +02:00
|
|
|
else:
|
2023-12-01 03:15:35 +01:00
|
|
|
if model_info.model_type in [
|
|
|
|
ModelType.DIFFUSERS_SD,
|
|
|
|
ModelType.DIFFUSERS_SDXL,
|
|
|
|
]:
|
|
|
|
raise NotImplementedError(
|
|
|
|
f"When using non inpaint Stable Diffusion model, you must enable controlnet"
|
|
|
|
)
|
|
|
|
if model_info.model_type == ModelType.DIFFUSERS_SD_INPAINT:
|
|
|
|
return SD(device, model_id_or_path=model_info.path, **kwargs)
|
2022-04-15 18:11:51 +02:00
|
|
|
|
2023-12-01 03:15:35 +01:00
|
|
|
if model_info.model_type == ModelType.DIFFUSERS_SDXL_INPAINT:
|
|
|
|
return SDXL(device, model_id_or_path=model_info.path, **kwargs)
|
|
|
|
|
|
|
|
raise NotImplementedError(f"Unsupported model: {name}")
|
2022-04-17 17:31:12 +02:00
|
|
|
|
2022-04-15 18:11:51 +02:00
|
|
|
def __call__(self, image, mask, config: Config):
|
2023-05-13 07:45:27 +02:00
|
|
|
self.switch_controlnet_method(control_method=config.controlnet_method)
|
2023-11-14 15:04:16 +01:00
|
|
|
self.enable_disable_freeu(config)
|
2023-11-15 01:50:35 +01:00
|
|
|
self.enable_disable_lcm_lora(config)
|
2022-04-15 18:11:51 +02:00
|
|
|
return self.model(image, mask, config)
|
|
|
|
|
2023-12-01 03:15:35 +01:00
|
|
|
def scan_models(self) -> List[ModelInfo]:
|
|
|
|
available_models = scan_models()
|
|
|
|
self.available_models = {it.name: it for it in available_models}
|
|
|
|
return available_models
|
|
|
|
|
|
|
|
def switch(self, new_name: str):
|
2022-04-15 18:11:51 +02:00
|
|
|
if new_name == self.name:
|
|
|
|
return
|
2023-12-01 03:15:35 +01:00
|
|
|
|
|
|
|
old_name = self.name
|
|
|
|
self.name = new_name
|
|
|
|
|
2022-04-15 18:11:51 +02:00
|
|
|
try:
|
2023-02-11 06:30:09 +01:00
|
|
|
if torch.cuda.memory_allocated() > 0:
|
2022-12-16 16:14:15 +01:00
|
|
|
# Clear current loaded model from memory
|
|
|
|
torch.cuda.empty_cache()
|
|
|
|
del self.model
|
2022-12-16 16:27:18 +01:00
|
|
|
gc.collect()
|
2022-12-16 16:14:15 +01:00
|
|
|
|
2023-02-11 06:30:09 +01:00
|
|
|
self.model = self.init_model(
|
|
|
|
new_name, switch_mps_device(new_name, self.device), **self.kwargs
|
|
|
|
)
|
2023-12-01 03:15:35 +01:00
|
|
|
except Exception as e:
|
|
|
|
self.name = old_name
|
2022-04-15 18:11:51 +02:00
|
|
|
raise e
|
2023-05-11 15:51:58 +02:00
|
|
|
|
|
|
|
def switch_controlnet_method(self, control_method: str):
|
|
|
|
if not self.kwargs.get("sd_controlnet"):
|
|
|
|
return
|
|
|
|
if self.kwargs["sd_controlnet_method"] == control_method:
|
|
|
|
return
|
2023-07-18 15:44:07 +02:00
|
|
|
|
2023-12-01 03:15:35 +01:00
|
|
|
if not self.available_models[self.name].support_controlnet():
|
|
|
|
return
|
2023-05-11 15:51:58 +02:00
|
|
|
|
|
|
|
del self.model
|
|
|
|
torch_gc()
|
|
|
|
|
2023-05-13 07:45:27 +02:00
|
|
|
old_method = self.kwargs["sd_controlnet_method"]
|
2023-05-11 15:51:58 +02:00
|
|
|
self.kwargs["sd_controlnet_method"] = control_method
|
|
|
|
self.model = self.init_model(
|
|
|
|
self.name, switch_mps_device(self.name, self.device), **self.kwargs
|
|
|
|
)
|
2023-05-13 07:45:27 +02:00
|
|
|
logger.info(f"Switch ControlNet method from {old_method} to {control_method}")
|
2023-11-14 15:04:16 +01:00
|
|
|
|
|
|
|
def enable_disable_freeu(self, config: Config):
|
|
|
|
if str(self.model.device) == "mps":
|
|
|
|
return
|
|
|
|
|
2023-12-01 03:15:35 +01:00
|
|
|
if self.available_models[self.name].support_freeu():
|
2023-11-14 15:04:16 +01:00
|
|
|
if config.sd_freeu:
|
|
|
|
freeu_config = config.sd_freeu_config
|
|
|
|
self.model.model.enable_freeu(
|
|
|
|
s1=freeu_config.s1,
|
|
|
|
s2=freeu_config.s2,
|
|
|
|
b1=freeu_config.b1,
|
|
|
|
b2=freeu_config.b2,
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
self.model.model.disable_freeu()
|
2023-11-15 01:50:35 +01:00
|
|
|
|
|
|
|
def enable_disable_lcm_lora(self, config: Config):
|
2023-12-01 03:15:35 +01:00
|
|
|
if self.available_models[self.name].support_lcm_lora():
|
2023-11-15 01:50:35 +01:00
|
|
|
if config.sd_lcm_lora:
|
|
|
|
if not self.model.model.pipe.get_list_adapters():
|
|
|
|
self.model.model.load_lora_weights(self.model.lcm_lora_id)
|
|
|
|
else:
|
|
|
|
self.model.model.disable_lora()
|