fix test
This commit is contained in:
parent
3f6bc8fada
commit
fbb278298b
@ -4,8 +4,14 @@ import PIL.Image
|
||||
import cv2
|
||||
import numpy as np
|
||||
import torch
|
||||
from diffusers import PNDMScheduler, DDIMScheduler, LMSDiscreteScheduler, EulerDiscreteScheduler, \
|
||||
EulerAncestralDiscreteScheduler, DPMSolverMultistepScheduler
|
||||
from diffusers import (
|
||||
PNDMScheduler,
|
||||
DDIMScheduler,
|
||||
LMSDiscreteScheduler,
|
||||
EulerDiscreteScheduler,
|
||||
EulerAncestralDiscreteScheduler,
|
||||
DPMSolverMultistepScheduler,
|
||||
)
|
||||
from loguru import logger
|
||||
|
||||
from lama_cleaner.model.base import DiffusionInpaintModel
|
||||
@ -16,7 +22,7 @@ from lama_cleaner.schema import Config, SDSampler
|
||||
class CPUTextEncoderWrapper:
|
||||
def __init__(self, text_encoder, torch_dtype):
|
||||
self.config = text_encoder.config
|
||||
self.text_encoder = text_encoder.to(torch.device('cpu'), non_blocking=True)
|
||||
self.text_encoder = text_encoder.to(torch.device("cpu"), non_blocking=True)
|
||||
self.text_encoder = self.text_encoder.to(torch.float32, non_blocking=True)
|
||||
self.torch_dtype = torch_dtype
|
||||
del text_encoder
|
||||
@ -24,7 +30,15 @@ class CPUTextEncoderWrapper:
|
||||
|
||||
def __call__(self, x, **kwargs):
|
||||
input_device = x.device
|
||||
return [self.text_encoder(x.to(self.text_encoder.device), **kwargs)[0].to(input_device).to(self.torch_dtype)]
|
||||
return [
|
||||
self.text_encoder(x.to(self.text_encoder.device), **kwargs)[0]
|
||||
.to(input_device)
|
||||
.to(self.torch_dtype)
|
||||
]
|
||||
|
||||
@property
|
||||
def dtype(self):
|
||||
return self.torch_dtype
|
||||
|
||||
|
||||
class SD(DiffusionInpaintModel):
|
||||
@ -33,18 +47,23 @@ class SD(DiffusionInpaintModel):
|
||||
|
||||
def init_model(self, device: torch.device, **kwargs):
|
||||
from diffusers.pipelines.stable_diffusion import StableDiffusionInpaintPipeline
|
||||
fp16 = not kwargs.get('no_half', False)
|
||||
|
||||
model_kwargs = {"local_files_only": kwargs.get('local_files_only', kwargs['sd_run_local'])}
|
||||
if kwargs['disable_nsfw'] or kwargs.get('cpu_offload', False):
|
||||
fp16 = not kwargs.get("no_half", False)
|
||||
|
||||
model_kwargs = {
|
||||
"local_files_only": kwargs.get("local_files_only", kwargs["sd_run_local"])
|
||||
}
|
||||
if kwargs["disable_nsfw"] or kwargs.get("cpu_offload", False):
|
||||
logger.info("Disable Stable Diffusion Model NSFW checker")
|
||||
model_kwargs.update(dict(
|
||||
model_kwargs.update(
|
||||
dict(
|
||||
safety_checker=None,
|
||||
feature_extractor=None,
|
||||
requires_safety_checker=False
|
||||
))
|
||||
requires_safety_checker=False,
|
||||
)
|
||||
)
|
||||
|
||||
use_gpu = device == torch.device('cuda') and torch.cuda.is_available()
|
||||
use_gpu = device == torch.device("cuda") and torch.cuda.is_available()
|
||||
torch_dtype = torch.float16 if use_gpu and fp16 else torch.float32
|
||||
self.model = StableDiffusionInpaintPipeline.from_pretrained(
|
||||
self.model_id_or_path,
|
||||
@ -57,18 +76,20 @@ class SD(DiffusionInpaintModel):
|
||||
# https://huggingface.co/docs/diffusers/v0.7.0/en/api/pipelines/stable_diffusion#diffusers.StableDiffusionInpaintPipeline.enable_attention_slicing
|
||||
self.model.enable_attention_slicing()
|
||||
# https://huggingface.co/docs/diffusers/v0.7.0/en/optimization/fp16#memory-efficient-attention
|
||||
if kwargs.get('enable_xformers', False):
|
||||
if kwargs.get("enable_xformers", False):
|
||||
self.model.enable_xformers_memory_efficient_attention()
|
||||
|
||||
if kwargs.get('cpu_offload', False) and use_gpu:
|
||||
if kwargs.get("cpu_offload", False) and use_gpu:
|
||||
# TODO: gpu_id
|
||||
logger.info("Enable sequential cpu offload")
|
||||
self.model.enable_sequential_cpu_offload(gpu_id=0)
|
||||
else:
|
||||
self.model = self.model.to(device)
|
||||
if kwargs['sd_cpu_textencoder']:
|
||||
if kwargs["sd_cpu_textencoder"]:
|
||||
logger.info("Run Stable Diffusion TextEncoder on CPU")
|
||||
self.model.text_encoder = CPUTextEncoderWrapper(self.model.text_encoder, torch_dtype)
|
||||
self.model.text_encoder = CPUTextEncoderWrapper(
|
||||
self.model.text_encoder, torch_dtype
|
||||
)
|
||||
|
||||
self.callback = kwargs.pop("callback", None)
|
||||
|
||||
|
@ -1,17 +1,23 @@
|
||||
import io
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from lama_cleaner.helper import pil_to_bytes
|
||||
|
||||
|
||||
current_dir = Path(__file__).parent.absolute().resolve()
|
||||
png_img_p = current_dir / "image.png"
|
||||
jpg_img_p = current_dir / "bunny.jpeg"
|
||||
|
||||
|
||||
def print_exif(exif):
|
||||
for k, v in exif.items():
|
||||
print(f"{k}: {v}")
|
||||
|
||||
|
||||
def test_png():
|
||||
img = Image.open("image.png")
|
||||
img = Image.open(png_img_p)
|
||||
exif = img.getexif()
|
||||
print_exif(exif)
|
||||
|
||||
@ -24,7 +30,7 @@ def test_png():
|
||||
|
||||
|
||||
def test_jpeg():
|
||||
img = Image.open("bunny.jpeg")
|
||||
img = Image.open(jpg_img_p)
|
||||
exif = img.getexif()
|
||||
print_exif(exif)
|
||||
|
||||
|
@ -8,33 +8,37 @@ from lama_cleaner.schema import HDStrategy, SDSampler
|
||||
from lama_cleaner.tests.test_model import get_config, assert_equal
|
||||
|
||||
current_dir = Path(__file__).parent.absolute().resolve()
|
||||
save_dir = current_dir / 'result'
|
||||
save_dir = current_dir / "result"
|
||||
save_dir.mkdir(exist_ok=True, parents=True)
|
||||
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
device = torch.device(device)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sd_device", ['cuda'])
|
||||
@pytest.mark.parametrize("sd_device", ["cuda"])
|
||||
@pytest.mark.parametrize("strategy", [HDStrategy.ORIGINAL])
|
||||
@pytest.mark.parametrize("sampler", [SDSampler.ddim])
|
||||
@pytest.mark.parametrize("cpu_textencoder", [True, False])
|
||||
@pytest.mark.parametrize("disable_nsfw", [True, False])
|
||||
def test_runway_sd_1_5_ddim(sd_device, strategy, sampler, cpu_textencoder, disable_nsfw):
|
||||
def test_runway_sd_1_5_ddim(
|
||||
sd_device, strategy, sampler, cpu_textencoder, disable_nsfw
|
||||
):
|
||||
def callback(i, t, latents):
|
||||
print(f"sd_step_{i}")
|
||||
pass
|
||||
|
||||
if sd_device == 'cuda' and not torch.cuda.is_available():
|
||||
if sd_device == "cuda" and not torch.cuda.is_available():
|
||||
return
|
||||
|
||||
sd_steps = 50 if sd_device == 'cuda' else 1
|
||||
model = ModelManager(name="sd1.5",
|
||||
sd_steps = 50 if sd_device == "cuda" else 1
|
||||
model = ModelManager(
|
||||
name="sd1.5",
|
||||
device=torch.device(sd_device),
|
||||
hf_access_token="",
|
||||
sd_run_local=True,
|
||||
disable_nsfw=disable_nsfw,
|
||||
sd_cpu_textencoder=cpu_textencoder,
|
||||
callback=callback)
|
||||
cfg = get_config(strategy, prompt='a fox sitting on a bench', sd_steps=sd_steps)
|
||||
callback=callback,
|
||||
)
|
||||
cfg = get_config(strategy, prompt="a fox sitting on a bench", sd_steps=sd_steps)
|
||||
cfg.sd_sampler = sampler
|
||||
|
||||
name = f"device_{sd_device}_{sampler}_cpu_textencoder_{cpu_textencoder}_disnsfw_{disable_nsfw}"
|
||||
@ -45,31 +49,35 @@ def test_runway_sd_1_5_ddim(sd_device, strategy, sampler, cpu_textencoder, disab
|
||||
f"runway_sd_{strategy.capitalize()}_{name}.png",
|
||||
img_p=current_dir / "overture-creations-5sI6fQgYIuo.png",
|
||||
mask_p=current_dir / "overture-creations-5sI6fQgYIuo_mask.png",
|
||||
fx=1.3
|
||||
fx=1.3,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sd_device", ['cuda'])
|
||||
@pytest.mark.parametrize("sd_device", ["cuda"])
|
||||
@pytest.mark.parametrize("strategy", [HDStrategy.ORIGINAL])
|
||||
@pytest.mark.parametrize("sampler", [SDSampler.pndm, SDSampler.k_lms, SDSampler.k_euler, SDSampler.k_euler_a])
|
||||
@pytest.mark.parametrize(
|
||||
"sampler", [SDSampler.pndm, SDSampler.k_lms, SDSampler.k_euler, SDSampler.k_euler_a]
|
||||
)
|
||||
@pytest.mark.parametrize("cpu_textencoder", [False])
|
||||
@pytest.mark.parametrize("disable_nsfw", [True])
|
||||
def test_runway_sd_1_5(sd_device, strategy, sampler, cpu_textencoder, disable_nsfw):
|
||||
def callback(i, t, latents):
|
||||
print(f"sd_step_{i}")
|
||||
|
||||
if sd_device == 'cuda' and not torch.cuda.is_available():
|
||||
if sd_device == "cuda" and not torch.cuda.is_available():
|
||||
return
|
||||
|
||||
sd_steps = 50 if sd_device == 'cuda' else 1
|
||||
model = ModelManager(name="sd1.5",
|
||||
sd_steps = 50 if sd_device == "cuda" else 1
|
||||
model = ModelManager(
|
||||
name="sd1.5",
|
||||
device=torch.device(sd_device),
|
||||
hf_access_token="",
|
||||
sd_run_local=True,
|
||||
disable_nsfw=disable_nsfw,
|
||||
sd_cpu_textencoder=cpu_textencoder,
|
||||
callback=callback)
|
||||
cfg = get_config(strategy, prompt='a fox sitting on a bench', sd_steps=sd_steps)
|
||||
callback=callback,
|
||||
)
|
||||
cfg = get_config(strategy, prompt="a fox sitting on a bench", sd_steps=sd_steps)
|
||||
cfg.sd_sampler = sampler
|
||||
|
||||
name = f"device_{sd_device}_{sampler}_cpu_textencoder_{cpu_textencoder}_disnsfw_{disable_nsfw}"
|
||||
@ -80,35 +88,37 @@ def test_runway_sd_1_5(sd_device, strategy, sampler, cpu_textencoder, disable_ns
|
||||
f"runway_sd_{strategy.capitalize()}_{name}.png",
|
||||
img_p=current_dir / "overture-creations-5sI6fQgYIuo.png",
|
||||
mask_p=current_dir / "overture-creations-5sI6fQgYIuo_mask.png",
|
||||
fx=1.3
|
||||
fx=1.3,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sd_device", ['cuda'])
|
||||
@pytest.mark.parametrize("sd_device", ["cuda"])
|
||||
@pytest.mark.parametrize("strategy", [HDStrategy.ORIGINAL])
|
||||
@pytest.mark.parametrize("sampler", [SDSampler.ddim])
|
||||
def test_runway_sd_1_5_negative_prompt(sd_device, strategy, sampler):
|
||||
def callback(i, t, latents):
|
||||
pass
|
||||
|
||||
if sd_device == 'cuda' and not torch.cuda.is_available():
|
||||
if sd_device == "cuda" and not torch.cuda.is_available():
|
||||
return
|
||||
|
||||
sd_steps = 50 if sd_device == 'cuda' else 1
|
||||
model = ModelManager(name="sd1.5",
|
||||
sd_steps = 50 if sd_device == "cuda" else 1
|
||||
model = ModelManager(
|
||||
name="sd1.5",
|
||||
device=torch.device(sd_device),
|
||||
hf_access_token="",
|
||||
sd_run_local=True,
|
||||
disable_nsfw=False,
|
||||
sd_cpu_textencoder=False,
|
||||
callback=callback)
|
||||
callback=callback,
|
||||
)
|
||||
cfg = get_config(
|
||||
strategy,
|
||||
sd_steps=sd_steps,
|
||||
prompt='Face of a fox, high resolution, sitting on a park bench',
|
||||
negative_prompt='orange, yellow, small',
|
||||
prompt="Face of a fox, high resolution, sitting on a park bench",
|
||||
negative_prompt="orange, yellow, small",
|
||||
sd_sampler=sampler,
|
||||
sd_match_histograms=True
|
||||
sd_match_histograms=True,
|
||||
)
|
||||
|
||||
name = f"{sampler}_negative_prompt"
|
||||
@ -119,27 +129,33 @@ def test_runway_sd_1_5_negative_prompt(sd_device, strategy, sampler):
|
||||
f"runway_sd_{strategy.capitalize()}_{name}.png",
|
||||
img_p=current_dir / "overture-creations-5sI6fQgYIuo.png",
|
||||
mask_p=current_dir / "overture-creations-5sI6fQgYIuo_mask.png",
|
||||
fx=1
|
||||
fx=1,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sd_device", ['cuda'])
|
||||
@pytest.mark.parametrize("sd_device", ["cuda"])
|
||||
@pytest.mark.parametrize("strategy", [HDStrategy.ORIGINAL])
|
||||
@pytest.mark.parametrize("sampler", [SDSampler.k_euler_a])
|
||||
@pytest.mark.parametrize("cpu_textencoder", [False])
|
||||
@pytest.mark.parametrize("disable_nsfw", [False])
|
||||
def test_runway_sd_1_5_sd_scale(sd_device, strategy, sampler, cpu_textencoder, disable_nsfw):
|
||||
if sd_device == 'cuda' and not torch.cuda.is_available():
|
||||
def test_runway_sd_1_5_sd_scale(
|
||||
sd_device, strategy, sampler, cpu_textencoder, disable_nsfw
|
||||
):
|
||||
if sd_device == "cuda" and not torch.cuda.is_available():
|
||||
return
|
||||
|
||||
sd_steps = 50 if sd_device == 'cuda' else 1
|
||||
model = ModelManager(name="sd1.5",
|
||||
sd_steps = 50 if sd_device == "cuda" else 1
|
||||
model = ModelManager(
|
||||
name="sd1.5",
|
||||
device=torch.device(sd_device),
|
||||
hf_access_token="",
|
||||
sd_run_local=True,
|
||||
disable_nsfw=disable_nsfw,
|
||||
sd_cpu_textencoder=cpu_textencoder)
|
||||
cfg = get_config(strategy, prompt='a fox sitting on a bench', sd_steps=sd_steps, sd_scale=0.85)
|
||||
sd_cpu_textencoder=cpu_textencoder,
|
||||
)
|
||||
cfg = get_config(
|
||||
strategy, prompt="a fox sitting on a bench", sd_steps=sd_steps, sd_scale=0.85
|
||||
)
|
||||
cfg.sd_sampler = sampler
|
||||
|
||||
name = f"device_{sd_device}_{sampler}_cpu_textencoder_{cpu_textencoder}_disnsfw_{disable_nsfw}"
|
||||
@ -150,26 +166,30 @@ def test_runway_sd_1_5_sd_scale(sd_device, strategy, sampler, cpu_textencoder, d
|
||||
f"runway_sd_{strategy.capitalize()}_{name}_sdscale.png",
|
||||
img_p=current_dir / "overture-creations-5sI6fQgYIuo.png",
|
||||
mask_p=current_dir / "overture-creations-5sI6fQgYIuo_mask.png",
|
||||
fx=1.3
|
||||
fx=1.3,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sd_device", ['cuda'])
|
||||
@pytest.mark.parametrize("sd_device", ["cuda"])
|
||||
@pytest.mark.parametrize("strategy", [HDStrategy.ORIGINAL])
|
||||
@pytest.mark.parametrize("sampler", [SDSampler.k_euler_a])
|
||||
def test_runway_sd_1_5_cpu_offload(sd_device, strategy, sampler):
|
||||
if sd_device == 'cuda' and not torch.cuda.is_available():
|
||||
if sd_device == "cuda" and not torch.cuda.is_available():
|
||||
return
|
||||
|
||||
sd_steps = 50 if sd_device == 'cuda' else 1
|
||||
model = ModelManager(name="sd1.5",
|
||||
sd_steps = 50 if sd_device == "cuda" else 1
|
||||
model = ModelManager(
|
||||
name="sd1.5",
|
||||
device=torch.device(sd_device),
|
||||
hf_access_token="",
|
||||
sd_run_local=True,
|
||||
disable_nsfw=True,
|
||||
sd_cpu_textencoder=False,
|
||||
cpu_offload=True)
|
||||
cfg = get_config(strategy, prompt='a fox sitting on a bench', sd_steps=sd_steps, sd_scale=0.85)
|
||||
cpu_offload=True,
|
||||
)
|
||||
cfg = get_config(
|
||||
strategy, prompt="a fox sitting on a bench", sd_steps=sd_steps, sd_scale=0.85
|
||||
)
|
||||
cfg.sd_sampler = sampler
|
||||
|
||||
name = f"device_{sd_device}_{sampler}"
|
||||
@ -182,27 +202,3 @@ def test_runway_sd_1_5_cpu_offload(sd_device, strategy, sampler):
|
||||
mask_p=current_dir / "overture-creations-5sI6fQgYIuo_mask.png",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sd_device", ['cpu'])
|
||||
@pytest.mark.parametrize("strategy", [HDStrategy.ORIGINAL])
|
||||
@pytest.mark.parametrize("sampler", [SDSampler.k_euler_a])
|
||||
def test_runway_sd_1_5_cpu_offload_cpu_device(sd_device, strategy, sampler):
|
||||
model = ModelManager(name="sd1.5",
|
||||
device=torch.device(sd_device),
|
||||
hf_access_token="",
|
||||
sd_run_local=True,
|
||||
disable_nsfw=False,
|
||||
sd_cpu_textencoder=False,
|
||||
cpu_offload=True)
|
||||
cfg = get_config(strategy, prompt='a fox sitting on a bench', sd_steps=1, sd_scale=0.85)
|
||||
cfg.sd_sampler = sampler
|
||||
|
||||
name = f"device_{sd_device}_{sampler}"
|
||||
|
||||
assert_equal(
|
||||
model,
|
||||
cfg,
|
||||
f"runway_sd_{strategy.capitalize()}_{name}_cpu_offload_cpu_device.png",
|
||||
img_p=current_dir / "overture-creations-5sI6fQgYIuo.png",
|
||||
mask_p=current_dir / "overture-creations-5sI6fQgYIuo_mask.png",
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user