From 38c8837af7dc2f56dfc7e69ccdd53f013599749d Mon Sep 17 00:00:00 2001 From: Qing Date: Fri, 2 Sep 2022 10:37:30 +0800 Subject: [PATCH] Add FcF --- lama_cleaner/model/base.py | 3 +- lama_cleaner/model/fcf.py | 1180 ++++++++++++++++++++++++++++++ lama_cleaner/model/mat.py | 630 +--------------- lama_cleaner/model/utils.py | 623 ++++++++++++++++ lama_cleaner/model_manager.py | 4 +- lama_cleaner/tests/test_model.py | 27 +- 6 files changed, 1831 insertions(+), 636 deletions(-) create mode 100644 lama_cleaner/model/fcf.py diff --git a/lama_cleaner/model/base.py b/lama_cleaner/model/base.py index 93183cb..a287c3f 100644 --- a/lama_cleaner/model/base.py +++ b/lama_cleaner/model/base.py @@ -4,6 +4,7 @@ from typing import Optional import cv2 import torch from loguru import logger +import numpy as np from lama_cleaner.helper import boxes_from_mask, resize_max_size, pad_img_to_modulo from lama_cleaner.schema import Config, HDStrategy @@ -51,7 +52,7 @@ class InpaintModel: result = self.forward(pad_image, pad_mask, config) result = result[0:origin_height, 0:origin_width, :] - original_pixel_indices = mask != 255 + original_pixel_indices = mask < 127 result[original_pixel_indices] = image[:, :, ::-1][original_pixel_indices] return result diff --git a/lama_cleaner/model/fcf.py b/lama_cleaner/model/fcf.py new file mode 100644 index 0000000..c05b1ed --- /dev/null +++ b/lama_cleaner/model/fcf.py @@ -0,0 +1,1180 @@ +import os +import random + +import cv2 +import torch +import numpy as np +import torch.fft as fft + +from lama_cleaner.schema import Config + +from lama_cleaner.helper import load_model, get_cache_path_by_url, norm_img +from lama_cleaner.model.base import InpaintModel +from torch import conv2d, nn +import torch.nn.functional as F + +from lama_cleaner.model.utils import setup_filter, _parse_scaling, _parse_padding, Conv2dLayer, FullyConnectedLayer, \ + MinibatchStdLayer, activation_funcs, conv2d_resample, bias_act, upsample2d, normalize_2nd_moment, downsample2d + + +def upfirdn2d(x, f, up=1, down=1, padding=0, flip_filter=False, gain=1, impl='cuda'): + assert isinstance(x, torch.Tensor) + return _upfirdn2d_ref(x, f, up=up, down=down, padding=padding, flip_filter=flip_filter, gain=gain) + + +def _upfirdn2d_ref(x, f, up=1, down=1, padding=0, flip_filter=False, gain=1): + """Slow reference implementation of `upfirdn2d()` using standard PyTorch ops. + """ + # Validate arguments. + assert isinstance(x, torch.Tensor) and x.ndim == 4 + if f is None: + f = torch.ones([1, 1], dtype=torch.float32, device=x.device) + assert isinstance(f, torch.Tensor) and f.ndim in [1, 2] + assert f.dtype == torch.float32 and not f.requires_grad + batch_size, num_channels, in_height, in_width = x.shape + upx, upy = _parse_scaling(up) + downx, downy = _parse_scaling(down) + padx0, padx1, pady0, pady1 = _parse_padding(padding) + + # Upsample by inserting zeros. + x = x.reshape([batch_size, num_channels, in_height, 1, in_width, 1]) + x = torch.nn.functional.pad(x, [0, upx - 1, 0, 0, 0, upy - 1]) + x = x.reshape([batch_size, num_channels, in_height * upy, in_width * upx]) + + # Pad or crop. + x = torch.nn.functional.pad(x, [max(padx0, 0), max(padx1, 0), max(pady0, 0), max(pady1, 0)]) + x = x[:, :, max(-pady0, 0): x.shape[2] - max(-pady1, 0), max(-padx0, 0): x.shape[3] - max(-padx1, 0)] + + # Setup filter. + f = f * (gain ** (f.ndim / 2)) + f = f.to(x.dtype) + if not flip_filter: + f = f.flip(list(range(f.ndim))) + + # Convolve with the filter. + f = f[np.newaxis, np.newaxis].repeat([num_channels, 1] + [1] * f.ndim) + if f.ndim == 4: + x = conv2d(input=x, weight=f, groups=num_channels) + else: + x = conv2d(input=x, weight=f.unsqueeze(2), groups=num_channels) + x = conv2d(input=x, weight=f.unsqueeze(3), groups=num_channels) + + # Downsample by throwing away pixels. + x = x[:, :, ::downy, ::downx] + return x + + +class EncoderEpilogue(torch.nn.Module): + def __init__(self, + in_channels, # Number of input channels. + cmap_dim, # Dimensionality of mapped conditioning label, 0 = no label. + z_dim, # Output Latent (Z) dimensionality. + resolution, # Resolution of this block. + img_channels, # Number of input color channels. + architecture='resnet', # Architecture: 'orig', 'skip', 'resnet'. + mbstd_group_size=4, # Group size for the minibatch standard deviation layer, None = entire minibatch. + mbstd_num_channels=1, # Number of features for the minibatch standard deviation layer, 0 = disable. + activation='lrelu', # Activation function: 'relu', 'lrelu', etc. + conv_clamp=None, # Clamp the output of convolution layers to +-X, None = disable clamping. + ): + assert architecture in ['orig', 'skip', 'resnet'] + super().__init__() + self.in_channels = in_channels + self.cmap_dim = cmap_dim + self.resolution = resolution + self.img_channels = img_channels + self.architecture = architecture + + if architecture == 'skip': + self.fromrgb = Conv2dLayer(self.img_channels, in_channels, kernel_size=1, activation=activation) + self.mbstd = MinibatchStdLayer(group_size=mbstd_group_size, + num_channels=mbstd_num_channels) if mbstd_num_channels > 0 else None + self.conv = Conv2dLayer(in_channels + mbstd_num_channels, in_channels, kernel_size=3, activation=activation, + conv_clamp=conv_clamp) + self.fc = FullyConnectedLayer(in_channels * (resolution ** 2), z_dim, activation=activation) + self.dropout = torch.nn.Dropout(p=0.5) + + def forward(self, x, cmap, force_fp32=False): + _ = force_fp32 # unused + dtype = torch.float32 + memory_format = torch.contiguous_format + + # FromRGB. + x = x.to(dtype=dtype, memory_format=memory_format) + + # Main layers. + if self.mbstd is not None: + x = self.mbstd(x) + const_e = self.conv(x) + x = self.fc(const_e.flatten(1)) + x = self.dropout(x) + + # Conditioning. + if self.cmap_dim > 0: + x = (x * cmap).sum(dim=1, keepdim=True) * (1 / np.sqrt(self.cmap_dim)) + + assert x.dtype == dtype + return x, const_e + + +class EncoderBlock(torch.nn.Module): + def __init__(self, + in_channels, # Number of input channels, 0 = first block. + tmp_channels, # Number of intermediate channels. + out_channels, # Number of output channels. + resolution, # Resolution of this block. + img_channels, # Number of input color channels. + first_layer_idx, # Index of the first layer. + architecture='skip', # Architecture: 'orig', 'skip', 'resnet'. + activation='lrelu', # Activation function: 'relu', 'lrelu', etc. + resample_filter=[1, 3, 3, 1], # Low-pass filter to apply when resampling activations. + conv_clamp=None, # Clamp the output of convolution layers to +-X, None = disable clamping. + use_fp16=False, # Use FP16 for this block? + fp16_channels_last=False, # Use channels-last memory format with FP16? + freeze_layers=0, # Freeze-D: Number of layers to freeze. + ): + assert in_channels in [0, tmp_channels] + assert architecture in ['orig', 'skip', 'resnet'] + super().__init__() + self.in_channels = in_channels + self.resolution = resolution + self.img_channels = img_channels + 1 + self.first_layer_idx = first_layer_idx + self.architecture = architecture + self.use_fp16 = use_fp16 + self.channels_last = (use_fp16 and fp16_channels_last) + self.register_buffer('resample_filter', setup_filter(resample_filter)) + + self.num_layers = 0 + + def trainable_gen(): + while True: + layer_idx = self.first_layer_idx + self.num_layers + trainable = (layer_idx >= freeze_layers) + self.num_layers += 1 + yield trainable + + trainable_iter = trainable_gen() + + if in_channels == 0: + self.fromrgb = Conv2dLayer(self.img_channels, tmp_channels, kernel_size=1, activation=activation, + trainable=next(trainable_iter), conv_clamp=conv_clamp, + channels_last=self.channels_last) + + self.conv0 = Conv2dLayer(tmp_channels, tmp_channels, kernel_size=3, activation=activation, + trainable=next(trainable_iter), conv_clamp=conv_clamp, + channels_last=self.channels_last) + + self.conv1 = Conv2dLayer(tmp_channels, out_channels, kernel_size=3, activation=activation, down=2, + trainable=next(trainable_iter), resample_filter=resample_filter, conv_clamp=conv_clamp, + channels_last=self.channels_last) + + if architecture == 'resnet': + self.skip = Conv2dLayer(tmp_channels, out_channels, kernel_size=1, bias=False, down=2, + trainable=next(trainable_iter), resample_filter=resample_filter, + channels_last=self.channels_last) + + def forward(self, x, img, force_fp32=False): + # dtype = torch.float16 if self.use_fp16 and not force_fp32 else torch.float32 + dtype = torch.float32 + memory_format = torch.channels_last if self.channels_last and not force_fp32 else torch.contiguous_format + + # Input. + if x is not None: + x = x.to(dtype=dtype, memory_format=memory_format) + + # FromRGB. + if self.in_channels == 0: + img = img.to(dtype=dtype, memory_format=memory_format) + y = self.fromrgb(img) + x = x + y if x is not None else y + img = downsample2d(img, self.resample_filter) if self.architecture == 'skip' else None + + # Main layers. + if self.architecture == 'resnet': + y = self.skip(x, gain=np.sqrt(0.5)) + x = self.conv0(x) + feat = x.clone() + x = self.conv1(x, gain=np.sqrt(0.5)) + x = y.add_(x) + else: + x = self.conv0(x) + feat = x.clone() + x = self.conv1(x) + + assert x.dtype == dtype + return x, img, feat + + +class EncoderNetwork(torch.nn.Module): + def __init__(self, + c_dim, # Conditioning label (C) dimensionality. + z_dim, # Input latent (Z) dimensionality. + img_resolution, # Input resolution. + img_channels, # Number of input color channels. + architecture='orig', # Architecture: 'orig', 'skip', 'resnet'. + channel_base=16384, # Overall multiplier for the number of channels. + channel_max=512, # Maximum number of channels in any layer. + num_fp16_res=0, # Use FP16 for the N highest resolutions. + conv_clamp=None, # Clamp the output of convolution layers to +-X, None = disable clamping. + cmap_dim=None, # Dimensionality of mapped conditioning label, None = default. + block_kwargs={}, # Arguments for DiscriminatorBlock. + mapping_kwargs={}, # Arguments for MappingNetwork. + epilogue_kwargs={}, # Arguments for EncoderEpilogue. + ): + super().__init__() + self.c_dim = c_dim + self.z_dim = z_dim + self.img_resolution = img_resolution + self.img_resolution_log2 = int(np.log2(img_resolution)) + self.img_channels = img_channels + self.block_resolutions = [2 ** i for i in range(self.img_resolution_log2, 2, -1)] + channels_dict = {res: min(channel_base // res, channel_max) for res in self.block_resolutions + [4]} + fp16_resolution = max(2 ** (self.img_resolution_log2 + 1 - num_fp16_res), 8) + + if cmap_dim is None: + cmap_dim = channels_dict[4] + if c_dim == 0: + cmap_dim = 0 + + common_kwargs = dict(img_channels=img_channels, architecture=architecture, conv_clamp=conv_clamp) + cur_layer_idx = 0 + for res in self.block_resolutions: + in_channels = channels_dict[res] if res < img_resolution else 0 + tmp_channels = channels_dict[res] + out_channels = channels_dict[res // 2] + use_fp16 = (res >= fp16_resolution) + use_fp16 = False + block = EncoderBlock(in_channels, tmp_channels, out_channels, resolution=res, + first_layer_idx=cur_layer_idx, use_fp16=use_fp16, **block_kwargs, **common_kwargs) + setattr(self, f'b{res}', block) + cur_layer_idx += block.num_layers + if c_dim > 0: + self.mapping = MappingNetwork(z_dim=0, c_dim=c_dim, w_dim=cmap_dim, num_ws=None, w_avg_beta=None, + **mapping_kwargs) + self.b4 = EncoderEpilogue(channels_dict[4], cmap_dim=cmap_dim, z_dim=z_dim * 2, resolution=4, **epilogue_kwargs, + **common_kwargs) + + def forward(self, img, c, **block_kwargs): + x = None + feats = {} + for res in self.block_resolutions: + block = getattr(self, f'b{res}') + x, img, feat = block(x, img, **block_kwargs) + feats[res] = feat + + cmap = None + if self.c_dim > 0: + cmap = self.mapping(None, c) + x, const_e = self.b4(x, cmap) + feats[4] = const_e + + B, _ = x.shape + z = torch.zeros((B, self.z_dim), requires_grad=False, dtype=x.dtype, + device=x.device) ## Noise for Co-Modulation + return x, z, feats + + +def fma(a, b, c): # => a * b + c + return _FusedMultiplyAdd.apply(a, b, c) + + +class _FusedMultiplyAdd(torch.autograd.Function): # a * b + c + @staticmethod + def forward(ctx, a, b, c): # pylint: disable=arguments-differ + out = torch.addcmul(c, a, b) + ctx.save_for_backward(a, b) + ctx.c_shape = c.shape + return out + + @staticmethod + def backward(ctx, dout): # pylint: disable=arguments-differ + a, b = ctx.saved_tensors + c_shape = ctx.c_shape + da = None + db = None + dc = None + + if ctx.needs_input_grad[0]: + da = _unbroadcast(dout * b, a.shape) + + if ctx.needs_input_grad[1]: + db = _unbroadcast(dout * a, b.shape) + + if ctx.needs_input_grad[2]: + dc = _unbroadcast(dout, c_shape) + + return da, db, dc + + +def _unbroadcast(x, shape): + extra_dims = x.ndim - len(shape) + assert extra_dims >= 0 + dim = [i for i in range(x.ndim) if x.shape[i] > 1 and (i < extra_dims or shape[i - extra_dims] == 1)] + if len(dim): + x = x.sum(dim=dim, keepdim=True) + if extra_dims: + x = x.reshape(-1, *x.shape[extra_dims + 1:]) + assert x.shape == shape + return x + + +def modulated_conv2d( + x, # Input tensor of shape [batch_size, in_channels, in_height, in_width]. + weight, # Weight tensor of shape [out_channels, in_channels, kernel_height, kernel_width]. + styles, # Modulation coefficients of shape [batch_size, in_channels]. + noise=None, # Optional noise tensor to add to the output activations. + up=1, # Integer upsampling factor. + down=1, # Integer downsampling factor. + padding=0, # Padding with respect to the upsampled image. + resample_filter=None, + # Low-pass filter to apply when resampling activations. Must be prepared beforehand by calling upfirdn2d.setup_filter(). + demodulate=True, # Apply weight demodulation? + flip_weight=True, # False = convolution, True = correlation (matches torch.nn.functional.conv2d). + fused_modconv=True, # Perform modulation, convolution, and demodulation as a single fused operation? +): + batch_size = x.shape[0] + out_channels, in_channels, kh, kw = weight.shape + + # Pre-normalize inputs to avoid FP16 overflow. + if x.dtype == torch.float16 and demodulate: + weight = weight * (1 / np.sqrt(in_channels * kh * kw) / weight.norm(float('inf'), dim=[1, 2, 3], + keepdim=True)) # max_Ikk + styles = styles / styles.norm(float('inf'), dim=1, keepdim=True) # max_I + + # Calculate per-sample weights and demodulation coefficients. + w = None + dcoefs = None + if demodulate or fused_modconv: + w = weight.unsqueeze(0) # [NOIkk] + w = w * styles.reshape(batch_size, 1, -1, 1, 1) # [NOIkk] + if demodulate: + dcoefs = (w.square().sum(dim=[2, 3, 4]) + 1e-8).rsqrt() # [NO] + if demodulate and fused_modconv: + w = w * dcoefs.reshape(batch_size, -1, 1, 1, 1) # [NOIkk] + # Execute by scaling the activations before and after the convolution. + if not fused_modconv: + x = x * styles.to(x.dtype).reshape(batch_size, -1, 1, 1) + x = conv2d_resample.conv2d_resample(x=x, w=weight.to(x.dtype), f=resample_filter, up=up, down=down, + padding=padding, flip_weight=flip_weight) + if demodulate and noise is not None: + x = fma(x, dcoefs.to(x.dtype).reshape(batch_size, -1, 1, 1), noise.to(x.dtype)) + elif demodulate: + x = x * dcoefs.to(x.dtype).reshape(batch_size, -1, 1, 1) + elif noise is not None: + x = x.add_(noise.to(x.dtype)) + return x + + # Execute as one fused op using grouped convolution. + batch_size = int(batch_size) + x = x.reshape(1, -1, *x.shape[2:]) + w = w.reshape(-1, in_channels, kh, kw) + x = conv2d_resample(x=x, w=w.to(x.dtype), f=resample_filter, up=up, down=down, padding=padding, + groups=batch_size, flip_weight=flip_weight) + x = x.reshape(batch_size, -1, *x.shape[2:]) + if noise is not None: + x = x.add_(noise) + return x + + +class SynthesisLayer(torch.nn.Module): + def __init__(self, + in_channels, # Number of input channels. + out_channels, # Number of output channels. + w_dim, # Intermediate latent (W) dimensionality. + resolution, # Resolution of this layer. + kernel_size=3, # Convolution kernel size. + up=1, # Integer upsampling factor. + use_noise=True, # Enable noise input? + activation='lrelu', # Activation function: 'relu', 'lrelu', etc. + resample_filter=[1, 3, 3, 1], # Low-pass filter to apply when resampling activations. + conv_clamp=None, # Clamp the output of convolution layers to +-X, None = disable clamping. + channels_last=False, # Use channels_last format for the weights? + ): + super().__init__() + self.resolution = resolution + self.up = up + self.use_noise = use_noise + self.activation = activation + self.conv_clamp = conv_clamp + self.register_buffer('resample_filter', setup_filter(resample_filter)) + self.padding = kernel_size // 2 + self.act_gain = activation_funcs[activation].def_gain + + self.affine = FullyConnectedLayer(w_dim, in_channels, bias_init=1) + memory_format = torch.channels_last if channels_last else torch.contiguous_format + self.weight = torch.nn.Parameter( + torch.randn([out_channels, in_channels, kernel_size, kernel_size]).to(memory_format=memory_format)) + if use_noise: + self.register_buffer('noise_const', torch.randn([resolution, resolution])) + self.noise_strength = torch.nn.Parameter(torch.zeros([])) + self.bias = torch.nn.Parameter(torch.zeros([out_channels])) + + def forward(self, x, w, noise_mode='none', fused_modconv=True, gain=1): + assert noise_mode in ['random', 'const', 'none'] + in_resolution = self.resolution // self.up + styles = self.affine(w) + + noise = None + if self.use_noise and noise_mode == 'random': + noise = torch.randn([x.shape[0], 1, self.resolution, self.resolution], + device=x.device) * self.noise_strength + if self.use_noise and noise_mode == 'const': + noise = self.noise_const * self.noise_strength + + flip_weight = (self.up == 1) # slightly faster + x = modulated_conv2d(x=x, weight=self.weight, styles=styles, noise=noise, up=self.up, + padding=self.padding, resample_filter=self.resample_filter, flip_weight=flip_weight, + fused_modconv=fused_modconv) + + act_gain = self.act_gain * gain + act_clamp = self.conv_clamp * gain if self.conv_clamp is not None else None + x = F.leaky_relu(x, negative_slope=0.2, inplace=False) + if act_gain != 1: + x = x * act_gain + if act_clamp is not None: + x = x.clamp(-act_clamp, act_clamp) + return x + + +class ToRGBLayer(torch.nn.Module): + def __init__(self, in_channels, out_channels, w_dim, kernel_size=1, conv_clamp=None, channels_last=False): + super().__init__() + self.conv_clamp = conv_clamp + self.affine = FullyConnectedLayer(w_dim, in_channels, bias_init=1) + memory_format = torch.channels_last if channels_last else torch.contiguous_format + self.weight = torch.nn.Parameter( + torch.randn([out_channels, in_channels, kernel_size, kernel_size]).to(memory_format=memory_format)) + self.bias = torch.nn.Parameter(torch.zeros([out_channels])) + self.weight_gain = 1 / np.sqrt(in_channels * (kernel_size ** 2)) + + def forward(self, x, w, fused_modconv=True): + styles = self.affine(w) * self.weight_gain + x = modulated_conv2d(x=x, weight=self.weight, styles=styles, demodulate=False, fused_modconv=fused_modconv) + x = bias_act(x, self.bias.to(x.dtype), clamp=self.conv_clamp) + return x + + +class SynthesisForeword(torch.nn.Module): + def __init__(self, + z_dim, # Output Latent (Z) dimensionality. + resolution, # Resolution of this block. + in_channels, + img_channels, # Number of input color channels. + architecture='skip', # Architecture: 'orig', 'skip', 'resnet'. + activation='lrelu', # Activation function: 'relu', 'lrelu', etc. + + ): + super().__init__() + self.in_channels = in_channels + self.z_dim = z_dim + self.resolution = resolution + self.img_channels = img_channels + self.architecture = architecture + + self.fc = FullyConnectedLayer(self.z_dim, (self.z_dim // 2) * 4 * 4, activation=activation) + self.conv = SynthesisLayer(self.in_channels, self.in_channels, w_dim=(z_dim // 2) * 3, resolution=4) + + if architecture == 'skip': + self.torgb = ToRGBLayer(self.in_channels, self.img_channels, kernel_size=1, w_dim=(z_dim // 2) * 3) + + def forward(self, x, ws, feats, img, force_fp32=False): + _ = force_fp32 # unused + dtype = torch.float32 + memory_format = torch.contiguous_format + + x_global = x.clone() + # ToRGB. + x = self.fc(x) + x = x.view(-1, self.z_dim // 2, 4, 4) + x = x.to(dtype=dtype, memory_format=memory_format) + + # Main layers. + x_skip = feats[4].clone() + x = x + x_skip + + mod_vector = [] + mod_vector.append(ws[:, 0]) + mod_vector.append(x_global.clone()) + mod_vector = torch.cat(mod_vector, dim=1) + + x = self.conv(x, mod_vector) + + mod_vector = [] + mod_vector.append(ws[:, 2 * 2 - 3]) + mod_vector.append(x_global.clone()) + mod_vector = torch.cat(mod_vector, dim=1) + + if self.architecture == 'skip': + img = self.torgb(x, mod_vector) + img = img.to(dtype=torch.float32, memory_format=torch.contiguous_format) + + assert x.dtype == dtype + return x, img + + +class SELayer(nn.Module): + def __init__(self, channel, reduction=16): + super(SELayer, self).__init__() + self.avg_pool = nn.AdaptiveAvgPool2d(1) + self.fc = nn.Sequential( + nn.Linear(channel, channel // reduction, bias=False), + nn.ReLU(inplace=False), + nn.Linear(channel // reduction, channel, bias=False), + nn.Sigmoid() + ) + + def forward(self, x): + b, c, _, _ = x.size() + y = self.avg_pool(x).view(b, c) + y = self.fc(y).view(b, c, 1, 1) + res = x * y.expand_as(x) + return res + + +class FourierUnit(nn.Module): + + def __init__(self, in_channels, out_channels, groups=1, spatial_scale_factor=None, spatial_scale_mode='bilinear', + spectral_pos_encoding=False, use_se=False, se_kwargs=None, ffc3d=False, fft_norm='ortho'): + # bn_layer not used + super(FourierUnit, self).__init__() + self.groups = groups + + self.conv_layer = torch.nn.Conv2d(in_channels=in_channels * 2 + (2 if spectral_pos_encoding else 0), + out_channels=out_channels * 2, + kernel_size=1, stride=1, padding=0, groups=self.groups, bias=False) + self.relu = torch.nn.ReLU(inplace=False) + + # squeeze and excitation block + self.use_se = use_se + if use_se: + if se_kwargs is None: + se_kwargs = {} + self.se = SELayer(self.conv_layer.in_channels, **se_kwargs) + + self.spatial_scale_factor = spatial_scale_factor + self.spatial_scale_mode = spatial_scale_mode + self.spectral_pos_encoding = spectral_pos_encoding + self.ffc3d = ffc3d + self.fft_norm = fft_norm + + def forward(self, x): + batch = x.shape[0] + + if self.spatial_scale_factor is not None: + orig_size = x.shape[-2:] + x = F.interpolate(x, scale_factor=self.spatial_scale_factor, mode=self.spatial_scale_mode, + align_corners=False) + + r_size = x.size() + # (batch, c, h, w/2+1, 2) + fft_dim = (-3, -2, -1) if self.ffc3d else (-2, -1) + ffted = fft.rfftn(x, dim=fft_dim, norm=self.fft_norm) + ffted = torch.stack((ffted.real, ffted.imag), dim=-1) + ffted = ffted.permute(0, 1, 4, 2, 3).contiguous() # (batch, c, 2, h, w/2+1) + ffted = ffted.view((batch, -1,) + ffted.size()[3:]) + + if self.spectral_pos_encoding: + height, width = ffted.shape[-2:] + coords_vert = torch.linspace(0, 1, height)[None, None, :, None].expand(batch, 1, height, width).to(ffted) + coords_hor = torch.linspace(0, 1, width)[None, None, None, :].expand(batch, 1, height, width).to(ffted) + ffted = torch.cat((coords_vert, coords_hor, ffted), dim=1) + + if self.use_se: + ffted = self.se(ffted) + + ffted = self.conv_layer(ffted) # (batch, c*2, h, w/2+1) + ffted = self.relu(ffted) + + ffted = ffted.view((batch, -1, 2,) + ffted.size()[2:]).permute( + 0, 1, 3, 4, 2).contiguous() # (batch,c, t, h, w/2+1, 2) + ffted = torch.complex(ffted[..., 0], ffted[..., 1]) + + ifft_shape_slice = x.shape[-3:] if self.ffc3d else x.shape[-2:] + output = torch.fft.irfftn(ffted, s=ifft_shape_slice, dim=fft_dim, norm=self.fft_norm) + + if self.spatial_scale_factor is not None: + output = F.interpolate(output, size=orig_size, mode=self.spatial_scale_mode, align_corners=False) + + return output + + +class SpectralTransform(nn.Module): + + def __init__(self, in_channels, out_channels, stride=1, groups=1, enable_lfu=True, **fu_kwargs): + # bn_layer not used + super(SpectralTransform, self).__init__() + self.enable_lfu = enable_lfu + if stride == 2: + self.downsample = nn.AvgPool2d(kernel_size=(2, 2), stride=2) + else: + self.downsample = nn.Identity() + + self.stride = stride + self.conv1 = nn.Sequential( + nn.Conv2d(in_channels, out_channels // + 2, kernel_size=1, groups=groups, bias=False), + # nn.BatchNorm2d(out_channels // 2), + nn.ReLU(inplace=True) + ) + self.fu = FourierUnit( + out_channels // 2, out_channels // 2, groups, **fu_kwargs) + if self.enable_lfu: + self.lfu = FourierUnit( + out_channels // 2, out_channels // 2, groups) + self.conv2 = torch.nn.Conv2d( + out_channels // 2, out_channels, kernel_size=1, groups=groups, bias=False) + + def forward(self, x): + + x = self.downsample(x) + x = self.conv1(x) + output = self.fu(x) + + if self.enable_lfu: + n, c, h, w = x.shape + split_no = 2 + split_s = h // split_no + xs = torch.cat(torch.split( + x[:, :c // 4], split_s, dim=-2), dim=1).contiguous() + xs = torch.cat(torch.split(xs, split_s, dim=-1), + dim=1).contiguous() + xs = self.lfu(xs) + xs = xs.repeat(1, 1, split_no, split_no).contiguous() + else: + xs = 0 + + output = self.conv2(x + output + xs) + + return output + + +class FFC(nn.Module): + + def __init__(self, in_channels, out_channels, kernel_size, + ratio_gin, ratio_gout, stride=1, padding=0, + dilation=1, groups=1, bias=False, enable_lfu=True, + padding_type='reflect', gated=False, **spectral_kwargs): + super(FFC, self).__init__() + + assert stride == 1 or stride == 2, "Stride should be 1 or 2." + self.stride = stride + + in_cg = int(in_channels * ratio_gin) + in_cl = in_channels - in_cg + out_cg = int(out_channels * ratio_gout) + out_cl = out_channels - out_cg + # groups_g = 1 if groups == 1 else int(groups * ratio_gout) + # groups_l = 1 if groups == 1 else groups - groups_g + + self.ratio_gin = ratio_gin + self.ratio_gout = ratio_gout + self.global_in_num = in_cg + + module = nn.Identity if in_cl == 0 or out_cl == 0 else nn.Conv2d + self.convl2l = module(in_cl, out_cl, kernel_size, + stride, padding, dilation, groups, bias, padding_mode=padding_type) + module = nn.Identity if in_cl == 0 or out_cg == 0 else nn.Conv2d + self.convl2g = module(in_cl, out_cg, kernel_size, + stride, padding, dilation, groups, bias, padding_mode=padding_type) + module = nn.Identity if in_cg == 0 or out_cl == 0 else nn.Conv2d + self.convg2l = module(in_cg, out_cl, kernel_size, + stride, padding, dilation, groups, bias, padding_mode=padding_type) + module = nn.Identity if in_cg == 0 or out_cg == 0 else SpectralTransform + self.convg2g = module( + in_cg, out_cg, stride, 1 if groups == 1 else groups // 2, enable_lfu, **spectral_kwargs) + + self.gated = gated + module = nn.Identity if in_cg == 0 or out_cl == 0 or not self.gated else nn.Conv2d + self.gate = module(in_channels, 2, 1) + + def forward(self, x, fname=None): + x_l, x_g = x if type(x) is tuple else (x, 0) + out_xl, out_xg = 0, 0 + + if self.gated: + total_input_parts = [x_l] + if torch.is_tensor(x_g): + total_input_parts.append(x_g) + total_input = torch.cat(total_input_parts, dim=1) + + gates = torch.sigmoid(self.gate(total_input)) + g2l_gate, l2g_gate = gates.chunk(2, dim=1) + else: + g2l_gate, l2g_gate = 1, 1 + + spec_x = self.convg2g(x_g) + + if self.ratio_gout != 1: + out_xl = self.convl2l(x_l) + self.convg2l(x_g) * g2l_gate + if self.ratio_gout != 0: + out_xg = self.convl2g(x_l) * l2g_gate + spec_x + + return out_xl, out_xg + + +class FFC_BN_ACT(nn.Module): + + def __init__(self, in_channels, out_channels, + kernel_size, ratio_gin, ratio_gout, + stride=1, padding=0, dilation=1, groups=1, bias=False, + norm_layer=nn.SyncBatchNorm, activation_layer=nn.Identity, + padding_type='reflect', + enable_lfu=True, **kwargs): + super(FFC_BN_ACT, self).__init__() + self.ffc = FFC(in_channels, out_channels, kernel_size, + ratio_gin, ratio_gout, stride, padding, dilation, + groups, bias, enable_lfu, padding_type=padding_type, **kwargs) + lnorm = nn.Identity if ratio_gout == 1 else norm_layer + gnorm = nn.Identity if ratio_gout == 0 else norm_layer + global_channels = int(out_channels * ratio_gout) + # self.bn_l = lnorm(out_channels - global_channels) + # self.bn_g = gnorm(global_channels) + + lact = nn.Identity if ratio_gout == 1 else activation_layer + gact = nn.Identity if ratio_gout == 0 else activation_layer + self.act_l = lact(inplace=True) + self.act_g = gact(inplace=True) + + def forward(self, x, fname=None): + x_l, x_g = self.ffc(x, fname=fname, ) + x_l = self.act_l(x_l) + x_g = self.act_g(x_g) + return x_l, x_g + + +class FFCResnetBlock(nn.Module): + def __init__(self, dim, padding_type, norm_layer, activation_layer=nn.ReLU, dilation=1, + spatial_transform_kwargs=None, inline=False, ratio_gin=0.75, ratio_gout=0.75): + super().__init__() + self.conv1 = FFC_BN_ACT(dim, dim, kernel_size=3, padding=dilation, dilation=dilation, + norm_layer=norm_layer, + activation_layer=activation_layer, + padding_type=padding_type, + ratio_gin=ratio_gin, ratio_gout=ratio_gout) + self.conv2 = FFC_BN_ACT(dim, dim, kernel_size=3, padding=dilation, dilation=dilation, + norm_layer=norm_layer, + activation_layer=activation_layer, + padding_type=padding_type, + ratio_gin=ratio_gin, ratio_gout=ratio_gout) + self.inline = inline + + def forward(self, x, fname=None): + if self.inline: + x_l, x_g = x[:, :-self.conv1.ffc.global_in_num], x[:, -self.conv1.ffc.global_in_num:] + else: + x_l, x_g = x if type(x) is tuple else (x, 0) + + id_l, id_g = x_l, x_g + + x_l, x_g = self.conv1((x_l, x_g), fname=fname) + x_l, x_g = self.conv2((x_l, x_g), fname=fname) + + x_l, x_g = id_l + x_l, id_g + x_g + out = x_l, x_g + if self.inline: + out = torch.cat(out, dim=1) + return out + + +class ConcatTupleLayer(nn.Module): + def forward(self, x): + assert isinstance(x, tuple) + x_l, x_g = x + assert torch.is_tensor(x_l) or torch.is_tensor(x_g) + if not torch.is_tensor(x_g): + return x_l + return torch.cat(x, dim=1) + + +class FFCBlock(torch.nn.Module): + def __init__(self, + dim, # Number of output/input channels. + kernel_size, # Width and height of the convolution kernel. + padding, + ratio_gin=0.75, + ratio_gout=0.75, + activation='linear', # Activation function: 'relu', 'lrelu', etc. + ): + super().__init__() + if activation == 'linear': + self.activation = nn.Identity + else: + self.activation = nn.ReLU + self.padding = padding + self.kernel_size = kernel_size + self.ffc_block = FFCResnetBlock(dim=dim, + padding_type='reflect', + norm_layer=nn.SyncBatchNorm, + activation_layer=self.activation, + dilation=1, + ratio_gin=ratio_gin, + ratio_gout=ratio_gout) + + self.concat_layer = ConcatTupleLayer() + + def forward(self, gen_ft, mask, fname=None): + x = gen_ft.float() + + x_l, x_g = x[:, :-self.ffc_block.conv1.ffc.global_in_num], x[:, -self.ffc_block.conv1.ffc.global_in_num:] + id_l, id_g = x_l, x_g + + x_l, x_g = self.ffc_block((x_l, x_g), fname=fname) + x_l, x_g = id_l + x_l, id_g + x_g + x = self.concat_layer((x_l, x_g)) + + return x + gen_ft.float() + + +class FFCSkipLayer(torch.nn.Module): + def __init__(self, + dim, # Number of input/output channels. + kernel_size=3, # Convolution kernel size. + ratio_gin=0.75, + ratio_gout=0.75, + ): + super().__init__() + self.padding = kernel_size // 2 + + self.ffc_act = FFCBlock(dim=dim, kernel_size=kernel_size, activation=nn.ReLU, + padding=self.padding, ratio_gin=ratio_gin, ratio_gout=ratio_gout) + + def forward(self, gen_ft, mask, fname=None): + x = self.ffc_act(gen_ft, mask, fname=fname) + return x + + +class SynthesisBlock(torch.nn.Module): + def __init__(self, + in_channels, # Number of input channels, 0 = first block. + out_channels, # Number of output channels. + w_dim, # Intermediate latent (W) dimensionality. + resolution, # Resolution of this block. + img_channels, # Number of output color channels. + is_last, # Is this the last block? + architecture='skip', # Architecture: 'orig', 'skip', 'resnet'. + resample_filter=[1, 3, 3, 1], # Low-pass filter to apply when resampling activations. + conv_clamp=None, # Clamp the output of convolution layers to +-X, None = disable clamping. + use_fp16=False, # Use FP16 for this block? + fp16_channels_last=False, # Use channels-last memory format with FP16? + **layer_kwargs, # Arguments for SynthesisLayer. + ): + assert architecture in ['orig', 'skip', 'resnet'] + super().__init__() + self.in_channels = in_channels + self.w_dim = w_dim + self.resolution = resolution + self.img_channels = img_channels + self.is_last = is_last + self.architecture = architecture + self.use_fp16 = use_fp16 + self.channels_last = (use_fp16 and fp16_channels_last) + self.register_buffer('resample_filter', setup_filter(resample_filter)) + self.num_conv = 0 + self.num_torgb = 0 + self.res_ffc = {4: 0, 8: 0, 16: 0, 32: 1, 64: 1, 128: 1, 256: 1, 512: 1} + + if in_channels != 0 and resolution >= 8: + self.ffc_skip = nn.ModuleList() + for _ in range(self.res_ffc[resolution]): + self.ffc_skip.append(FFCSkipLayer(dim=out_channels)) + + if in_channels == 0: + self.const = torch.nn.Parameter(torch.randn([out_channels, resolution, resolution])) + + if in_channels != 0: + self.conv0 = SynthesisLayer(in_channels, out_channels, w_dim=w_dim * 3, resolution=resolution, up=2, + resample_filter=resample_filter, conv_clamp=conv_clamp, + channels_last=self.channels_last, **layer_kwargs) + self.num_conv += 1 + + self.conv1 = SynthesisLayer(out_channels, out_channels, w_dim=w_dim * 3, resolution=resolution, + conv_clamp=conv_clamp, channels_last=self.channels_last, **layer_kwargs) + self.num_conv += 1 + + if is_last or architecture == 'skip': + self.torgb = ToRGBLayer(out_channels, img_channels, w_dim=w_dim * 3, + conv_clamp=conv_clamp, channels_last=self.channels_last) + self.num_torgb += 1 + + if in_channels != 0 and architecture == 'resnet': + self.skip = Conv2dLayer(in_channels, out_channels, kernel_size=1, bias=False, up=2, + resample_filter=resample_filter, channels_last=self.channels_last) + + def forward(self, x, mask, feats, img, ws, fname=None, force_fp32=False, fused_modconv=None, **layer_kwargs): + dtype = torch.float16 if self.use_fp16 and not force_fp32 else torch.float32 + dtype = torch.float32 + memory_format = torch.channels_last if self.channels_last and not force_fp32 else torch.contiguous_format + if fused_modconv is None: + fused_modconv = (not self.training) and (dtype == torch.float32 or int(x.shape[0]) == 1) + + x = x.to(dtype=dtype, memory_format=memory_format) + x_skip = feats[self.resolution].clone().to(dtype=dtype, memory_format=memory_format) + + # Main layers. + if self.in_channels == 0: + x = self.conv1(x, ws[1], fused_modconv=fused_modconv, **layer_kwargs) + elif self.architecture == 'resnet': + y = self.skip(x, gain=np.sqrt(0.5)) + x = self.conv0(x, ws[0].clone(), fused_modconv=fused_modconv, **layer_kwargs) + if len(self.ffc_skip) > 0: + mask = F.interpolate(mask, size=x_skip.shape[2:], ) + z = x + x_skip + for fres in self.ffc_skip: + z = fres(z, mask) + x = x + z + else: + x = x + x_skip + x = self.conv1(x, ws[1].clone(), fused_modconv=fused_modconv, gain=np.sqrt(0.5), **layer_kwargs) + x = y.add_(x) + else: + x = self.conv0(x, ws[0].clone(), fused_modconv=fused_modconv, **layer_kwargs) + if len(self.ffc_skip) > 0: + mask = F.interpolate(mask, size=x_skip.shape[2:], ) + z = x + x_skip + for fres in self.ffc_skip: + z = fres(z, mask) + x = x + z + else: + x = x + x_skip + x = self.conv1(x, ws[1].clone(), fused_modconv=fused_modconv, **layer_kwargs) + # ToRGB. + if img is not None: + img = upsample2d(img, self.resample_filter) + if self.is_last or self.architecture == 'skip': + y = self.torgb(x, ws[2].clone(), fused_modconv=fused_modconv) + y = y.to(dtype=torch.float32, memory_format=torch.contiguous_format) + img = img.add_(y) if img is not None else y + + x = x.to(dtype=dtype) + assert x.dtype == dtype + assert img is None or img.dtype == torch.float32 + return x, img + + +class SynthesisNetwork(torch.nn.Module): + def __init__(self, + w_dim, # Intermediate latent (W) dimensionality. + z_dim, # Output Latent (Z) dimensionality. + img_resolution, # Output image resolution. + img_channels, # Number of color channels. + channel_base=16384, # Overall multiplier for the number of channels. + channel_max=512, # Maximum number of channels in any layer. + num_fp16_res=0, # Use FP16 for the N highest resolutions. + **block_kwargs, # Arguments for SynthesisBlock. + ): + assert img_resolution >= 4 and img_resolution & (img_resolution - 1) == 0 + super().__init__() + self.w_dim = w_dim + self.img_resolution = img_resolution + self.img_resolution_log2 = int(np.log2(img_resolution)) + self.img_channels = img_channels + self.block_resolutions = [2 ** i for i in range(3, self.img_resolution_log2 + 1)] + channels_dict = {res: min(channel_base // res, channel_max) for res in self.block_resolutions} + fp16_resolution = max(2 ** (self.img_resolution_log2 + 1 - num_fp16_res), 8) + + self.foreword = SynthesisForeword(img_channels=img_channels, in_channels=min(channel_base // 4, channel_max), + z_dim=z_dim * 2, resolution=4) + + self.num_ws = self.img_resolution_log2 * 2 - 2 + for res in self.block_resolutions: + if res // 2 in channels_dict.keys(): + in_channels = channels_dict[res // 2] if res > 4 else 0 + else: + in_channels = min(channel_base // (res // 2), channel_max) + out_channels = channels_dict[res] + use_fp16 = (res >= fp16_resolution) + use_fp16 = False + is_last = (res == self.img_resolution) + block = SynthesisBlock(in_channels, out_channels, w_dim=w_dim, resolution=res, + img_channels=img_channels, is_last=is_last, use_fp16=use_fp16, **block_kwargs) + setattr(self, f'b{res}', block) + + def forward(self, x_global, mask, feats, ws, fname=None, **block_kwargs): + + img = None + + x, img = self.foreword(x_global, ws, feats, img) + + for res in self.block_resolutions: + block = getattr(self, f'b{res}') + mod_vector0 = [] + mod_vector0.append(ws[:, int(np.log2(res)) * 2 - 5]) + mod_vector0.append(x_global.clone()) + mod_vector0 = torch.cat(mod_vector0, dim=1) + + mod_vector1 = [] + mod_vector1.append(ws[:, int(np.log2(res)) * 2 - 4]) + mod_vector1.append(x_global.clone()) + mod_vector1 = torch.cat(mod_vector1, dim=1) + + mod_vector_rgb = [] + mod_vector_rgb.append(ws[:, int(np.log2(res)) * 2 - 3]) + mod_vector_rgb.append(x_global.clone()) + mod_vector_rgb = torch.cat(mod_vector_rgb, dim=1) + x, img = block(x, mask, feats, img, (mod_vector0, mod_vector1, mod_vector_rgb), fname=fname, **block_kwargs) + return img + + +class MappingNetwork(torch.nn.Module): + def __init__(self, + z_dim, # Input latent (Z) dimensionality, 0 = no latent. + c_dim, # Conditioning label (C) dimensionality, 0 = no label. + w_dim, # Intermediate latent (W) dimensionality. + num_ws, # Number of intermediate latents to output, None = do not broadcast. + num_layers=8, # Number of mapping layers. + embed_features=None, # Label embedding dimensionality, None = same as w_dim. + layer_features=None, # Number of intermediate features in the mapping layers, None = same as w_dim. + activation='lrelu', # Activation function: 'relu', 'lrelu', etc. + lr_multiplier=0.01, # Learning rate multiplier for the mapping layers. + w_avg_beta=0.995, # Decay for tracking the moving average of W during training, None = do not track. + ): + super().__init__() + self.z_dim = z_dim + self.c_dim = c_dim + self.w_dim = w_dim + self.num_ws = num_ws + self.num_layers = num_layers + self.w_avg_beta = w_avg_beta + + if embed_features is None: + embed_features = w_dim + if c_dim == 0: + embed_features = 0 + if layer_features is None: + layer_features = w_dim + features_list = [z_dim + embed_features] + [layer_features] * (num_layers - 1) + [w_dim] + + if c_dim > 0: + self.embed = FullyConnectedLayer(c_dim, embed_features) + for idx in range(num_layers): + in_features = features_list[idx] + out_features = features_list[idx + 1] + layer = FullyConnectedLayer(in_features, out_features, activation=activation, lr_multiplier=lr_multiplier) + setattr(self, f'fc{idx}', layer) + + if num_ws is not None and w_avg_beta is not None: + self.register_buffer('w_avg', torch.zeros([w_dim])) + + def forward(self, z, c, truncation_psi=1, truncation_cutoff=None, skip_w_avg_update=False): + # Embed, normalize, and concat inputs. + x = None + with torch.autograd.profiler.record_function('input'): + if self.z_dim > 0: + x = normalize_2nd_moment(z.to(torch.float32)) + if self.c_dim > 0: + y = normalize_2nd_moment(self.embed(c.to(torch.float32))) + x = torch.cat([x, y], dim=1) if x is not None else y + + # Main layers. + for idx in range(self.num_layers): + layer = getattr(self, f'fc{idx}') + x = layer(x) + + # Update moving average of W. + if self.w_avg_beta is not None and self.training and not skip_w_avg_update: + with torch.autograd.profiler.record_function('update_w_avg'): + self.w_avg.copy_(x.detach().mean(dim=0).lerp(self.w_avg, self.w_avg_beta)) + + # Broadcast. + if self.num_ws is not None: + with torch.autograd.profiler.record_function('broadcast'): + x = x.unsqueeze(1).repeat([1, self.num_ws, 1]) + + # Apply truncation. + if truncation_psi != 1: + with torch.autograd.profiler.record_function('truncate'): + assert self.w_avg_beta is not None + if self.num_ws is None or truncation_cutoff is None: + x = self.w_avg.lerp(x, truncation_psi) + else: + x[:, :truncation_cutoff] = self.w_avg.lerp(x[:, :truncation_cutoff], truncation_psi) + return x + + +class Generator(torch.nn.Module): + def __init__(self, + z_dim, # Input latent (Z) dimensionality. + c_dim, # Conditioning label (C) dimensionality. + w_dim, # Intermediate latent (W) dimensionality. + img_resolution, # Output resolution. + img_channels, # Number of output color channels. + encoder_kwargs={}, # Arguments for EncoderNetwork. + mapping_kwargs={}, # Arguments for MappingNetwork. + synthesis_kwargs={}, # Arguments for SynthesisNetwork. + ): + super().__init__() + self.z_dim = z_dim + self.c_dim = c_dim + self.w_dim = w_dim + self.img_resolution = img_resolution + self.img_channels = img_channels + self.encoder = EncoderNetwork(c_dim=c_dim, z_dim=z_dim, img_resolution=img_resolution, + img_channels=img_channels, **encoder_kwargs) + self.synthesis = SynthesisNetwork(z_dim=z_dim, w_dim=w_dim, img_resolution=img_resolution, + img_channels=img_channels, **synthesis_kwargs) + self.num_ws = self.synthesis.num_ws + self.mapping = MappingNetwork(z_dim=z_dim, c_dim=c_dim, w_dim=w_dim, num_ws=self.num_ws, **mapping_kwargs) + + def forward(self, img, c, fname=None, truncation_psi=1, truncation_cutoff=None, **synthesis_kwargs): + mask = img[:, -1].unsqueeze(1) + x_global, z, feats = self.encoder(img, c) + ws = self.mapping(z, c, truncation_psi=truncation_psi, truncation_cutoff=truncation_cutoff) + img = self.synthesis(x_global, mask, feats, ws, fname=fname, **synthesis_kwargs) + return img + + +FCF_MODEL_URL = os.environ.get( + "FCF_MODEL_URL", + # "https://github.com/Sanster/models/releases/download/add_mat/Places_512_FullData_G.pth", + "/Users/qing/code/github/FcF-Inpainting/places_512_G.pth" +) + + +class FcF(InpaintModel): + min_size = 512 + pad_mod = 512 + pad_to_square = True + + def init_model(self, device): + seed = 0 + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + + kwargs = {'channel_base': 1 * 32768, 'channel_max': 512, 'num_fp16_res': 4, 'conv_clamp': 256} + G = Generator(z_dim=512, c_dim=0, w_dim=512, img_resolution=512, img_channels=3, + synthesis_kwargs=kwargs, encoder_kwargs=kwargs, mapping_kwargs={'num_layers': 2}) + self.model = load_model(G, FCF_MODEL_URL, device) + self.label = torch.zeros([1, self.model.c_dim], device=device) + + @staticmethod + def is_downloaded() -> bool: + return os.path.exists(get_cache_path_by_url(FCF_MODEL_URL)) + + def forward(self, image, mask, config: Config): + """Input images and output images have same size + images: [H, W, C] RGB + masks: [H, W] mask area == 255 + return: BGR IMAGE + """ + + image = norm_img(image) # [0, 1] + image = image * 2 - 1 # [0, 1] -> [-1, 1] + mask = (mask > 120) * 255 + mask = norm_img(mask) + + image = torch.from_numpy(image).unsqueeze(0).to(self.device) + mask = torch.from_numpy(mask).unsqueeze(0).to(self.device) + + erased_img = image * (1 - mask) + input_image = torch.cat([0.5 - mask, erased_img], dim=1) + + output = self.model(input_image, self.label, truncation_psi=0.1, noise_mode='none') + output = (output.permute(0, 2, 3, 1) * 127.5 + 127.5).round().clamp(0, 255).to(torch.uint8) + output = output[0].cpu().numpy() + cur_res = cv2.cvtColor(output, cv2.COLOR_RGB2BGR) + return cur_res diff --git a/lama_cleaner/model/mat.py b/lama_cleaner/model/mat.py index cb2fafb..dbd634f 100644 --- a/lama_cleaner/model/mat.py +++ b/lama_cleaner/model/mat.py @@ -1,615 +1,20 @@ -import collections import os -from itertools import repeat -from typing import Any import random import cv2 +import numpy as np import torch import torch.nn as nn import torch.nn.functional as F +import torch.utils.checkpoint as checkpoint from lama_cleaner.helper import load_model, get_cache_path_by_url, norm_img from lama_cleaner.model.base import InpaintModel -from torch.nn.functional import conv2d, conv_transpose2d -import torch.utils.checkpoint as checkpoint -import numpy as np - +from lama_cleaner.model.utils import setup_filter, Conv2dLayer, FullyConnectedLayer, conv2d_resample, bias_act, \ + upsample2d, activation_funcs, MinibatchStdLayer, to_2tuple, normalize_2nd_moment from lama_cleaner.schema import Config -class EasyDict(dict): - """Convenience class that behaves like a dict but allows access with the attribute syntax.""" - - def __getattr__(self, name: str) -> Any: - try: - return self[name] - except KeyError: - raise AttributeError(name) - - def __setattr__(self, name: str, value: Any) -> None: - self[name] = value - - def __delattr__(self, name: str) -> None: - del self[name] - - -activation_funcs = { - 'linear': EasyDict(func=lambda x, **_: x, def_alpha=0, def_gain=1, cuda_idx=1, ref='', has_2nd_grad=False), - 'relu': EasyDict(func=lambda x, **_: torch.nn.functional.relu(x), def_alpha=0, def_gain=np.sqrt(2), cuda_idx=2, - ref='y', has_2nd_grad=False), - 'lrelu': EasyDict(func=lambda x, alpha, **_: torch.nn.functional.leaky_relu(x, alpha), def_alpha=0.2, - def_gain=np.sqrt(2), cuda_idx=3, ref='y', has_2nd_grad=False), - 'tanh': EasyDict(func=lambda x, **_: torch.tanh(x), def_alpha=0, def_gain=1, cuda_idx=4, ref='y', - has_2nd_grad=True), - 'sigmoid': EasyDict(func=lambda x, **_: torch.sigmoid(x), def_alpha=0, def_gain=1, cuda_idx=5, ref='y', - has_2nd_grad=True), - 'elu': EasyDict(func=lambda x, **_: torch.nn.functional.elu(x), def_alpha=0, def_gain=1, cuda_idx=6, ref='y', - has_2nd_grad=True), - 'selu': EasyDict(func=lambda x, **_: torch.nn.functional.selu(x), def_alpha=0, def_gain=1, cuda_idx=7, ref='y', - has_2nd_grad=True), - 'softplus': EasyDict(func=lambda x, **_: torch.nn.functional.softplus(x), def_alpha=0, def_gain=1, cuda_idx=8, - ref='y', has_2nd_grad=True), - 'swish': EasyDict(func=lambda x, **_: torch.sigmoid(x) * x, def_alpha=0, def_gain=np.sqrt(2), cuda_idx=9, ref='x', - has_2nd_grad=True), -} - - -def _ntuple(n): - def parse(x): - if isinstance(x, collections.abc.Iterable): - return x - return tuple(repeat(x, n)) - - return parse - - -to_2tuple = _ntuple(2) - - -def _bias_act_ref(x, b=None, dim=1, act='linear', alpha=None, gain=None, clamp=None): - """Slow reference implementation of `bias_act()` using standard TensorFlow ops. - """ - assert isinstance(x, torch.Tensor) - assert clamp is None or clamp >= 0 - spec = activation_funcs[act] - alpha = float(alpha if alpha is not None else spec.def_alpha) - gain = float(gain if gain is not None else spec.def_gain) - clamp = float(clamp if clamp is not None else -1) - - # Add bias. - if b is not None: - assert isinstance(b, torch.Tensor) and b.ndim == 1 - assert 0 <= dim < x.ndim - assert b.shape[0] == x.shape[dim] - x = x + b.reshape([-1 if i == dim else 1 for i in range(x.ndim)]) - - # Evaluate activation function. - alpha = float(alpha) - x = spec.func(x, alpha=alpha) - - # Scale by gain. - gain = float(gain) - if gain != 1: - x = x * gain - - # Clamp. - if clamp >= 0: - x = x.clamp(-clamp, clamp) # pylint: disable=invalid-unary-operand-type - return x - - -def bias_act(x, b=None, dim=1, act='linear', alpha=None, gain=None, clamp=None, impl='ref'): - r"""Fused bias and activation function. - - Adds bias `b` to activation tensor `x`, evaluates activation function `act`, - and scales the result by `gain`. Each of the steps is optional. In most cases, - the fused op is considerably more efficient than performing the same calculation - using standard PyTorch ops. It supports first and second order gradients, - but not third order gradients. - - Args: - x: Input activation tensor. Can be of any shape. - b: Bias vector, or `None` to disable. Must be a 1D tensor of the same type - as `x`. The shape must be known, and it must match the dimension of `x` - corresponding to `dim`. - dim: The dimension in `x` corresponding to the elements of `b`. - The value of `dim` is ignored if `b` is not specified. - act: Name of the activation function to evaluate, or `"linear"` to disable. - Can be e.g. `"relu"`, `"lrelu"`, `"tanh"`, `"sigmoid"`, `"swish"`, etc. - See `activation_funcs` for a full list. `None` is not allowed. - alpha: Shape parameter for the activation function, or `None` to use the default. - gain: Scaling factor for the output tensor, or `None` to use default. - See `activation_funcs` for the default scaling of each activation function. - If unsure, consider specifying 1. - clamp: Clamp the output values to `[-clamp, +clamp]`, or `None` to disable - the clamping (default). - impl: Name of the implementation to use. Can be `"ref"` or `"cuda"` (default). - - Returns: - Tensor of the same shape and datatype as `x`. - """ - assert isinstance(x, torch.Tensor) - assert impl in ['ref', 'cuda'] - return _bias_act_ref(x=x, b=b, dim=dim, act=act, alpha=alpha, gain=gain, clamp=clamp) - - -def _get_filter_size(f): - if f is None: - return 1, 1 - - assert isinstance(f, torch.Tensor) and f.ndim in [1, 2] - fw = f.shape[-1] - fh = f.shape[0] - - fw = int(fw) - fh = int(fh) - assert fw >= 1 and fh >= 1 - return fw, fh - - -def _get_weight_shape(w): - shape = [int(sz) for sz in w.shape] - return shape - - -def _parse_scaling(scaling): - if isinstance(scaling, int): - scaling = [scaling, scaling] - assert isinstance(scaling, (list, tuple)) - assert all(isinstance(x, int) for x in scaling) - sx, sy = scaling - assert sx >= 1 and sy >= 1 - return sx, sy - - -def _parse_padding(padding): - if isinstance(padding, int): - padding = [padding, padding] - assert isinstance(padding, (list, tuple)) - assert all(isinstance(x, int) for x in padding) - if len(padding) == 2: - padx, pady = padding - padding = [padx, padx, pady, pady] - padx0, padx1, pady0, pady1 = padding - return padx0, padx1, pady0, pady1 - - -def setup_filter(f, device=torch.device('cpu'), normalize=True, flip_filter=False, gain=1, separable=None): - r"""Convenience function to setup 2D FIR filter for `upfirdn2d()`. - - Args: - f: Torch tensor, numpy array, or python list of the shape - `[filter_height, filter_width]` (non-separable), - `[filter_taps]` (separable), - `[]` (impulse), or - `None` (identity). - device: Result device (default: cpu). - normalize: Normalize the filter so that it retains the magnitude - for constant input signal (DC)? (default: True). - flip_filter: Flip the filter? (default: False). - gain: Overall scaling factor for signal magnitude (default: 1). - separable: Return a separable filter? (default: select automatically). - - Returns: - Float32 tensor of the shape - `[filter_height, filter_width]` (non-separable) or - `[filter_taps]` (separable). - """ - # Validate. - if f is None: - f = 1 - f = torch.as_tensor(f, dtype=torch.float32) - assert f.ndim in [0, 1, 2] - assert f.numel() > 0 - if f.ndim == 0: - f = f[np.newaxis] - - # Separable? - if separable is None: - separable = (f.ndim == 1 and f.numel() >= 8) - if f.ndim == 1 and not separable: - f = f.ger(f) - assert f.ndim == (1 if separable else 2) - - # Apply normalize, flip, gain, and device. - if normalize: - f /= f.sum() - if flip_filter: - f = f.flip(list(range(f.ndim))) - f = f * (gain ** (f.ndim / 2)) - f = f.to(device=device) - return f - - -def upfirdn2d(x, f, up=1, down=1, padding=0, flip_filter=False, gain=1, impl='cuda'): - r"""Pad, upsample, filter, and downsample a batch of 2D images. - - Performs the following sequence of operations for each channel: - - 1. Upsample the image by inserting N-1 zeros after each pixel (`up`). - - 2. Pad the image with the specified number of zeros on each side (`padding`). - Negative padding corresponds to cropping the image. - - 3. Convolve the image with the specified 2D FIR filter (`f`), shrinking it - so that the footprint of all output pixels lies within the input image. - - 4. Downsample the image by keeping every Nth pixel (`down`). - - This sequence of operations bears close resemblance to scipy.signal.upfirdn(). - The fused op is considerably more efficient than performing the same calculation - using standard PyTorch ops. It supports gradients of arbitrary order. - - Args: - x: Float32/float64/float16 input tensor of the shape - `[batch_size, num_channels, in_height, in_width]`. - f: Float32 FIR filter of the shape - `[filter_height, filter_width]` (non-separable), - `[filter_taps]` (separable), or - `None` (identity). - up: Integer upsampling factor. Can be a single int or a list/tuple - `[x, y]` (default: 1). - down: Integer downsampling factor. Can be a single int or a list/tuple - `[x, y]` (default: 1). - padding: Padding with respect to the upsampled image. Can be a single number - or a list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` - (default: 0). - flip_filter: False = convolution, True = correlation (default: False). - gain: Overall scaling factor for signal magnitude (default: 1). - impl: Implementation to use. Can be `'ref'` or `'cuda'` (default: `'cuda'`). - - Returns: - Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. - """ - # assert isinstance(x, torch.Tensor) - # assert impl in ['ref', 'cuda'] - return _upfirdn2d_ref(x, f, up=up, down=down, padding=padding, flip_filter=flip_filter, gain=gain) - - -def _upfirdn2d_ref(x, f, up=1, down=1, padding=0, flip_filter=False, gain=1): - """Slow reference implementation of `upfirdn2d()` using standard PyTorch ops. - """ - # Validate arguments. - assert isinstance(x, torch.Tensor) and x.ndim == 4 - if f is None: - f = torch.ones([1, 1], dtype=torch.float32, device=x.device) - assert isinstance(f, torch.Tensor) and f.ndim in [1, 2] - assert f.dtype == torch.float32 and not f.requires_grad - batch_size, num_channels, in_height, in_width = x.shape - # upx, upy = _parse_scaling(up) - # downx, downy = _parse_scaling(down) - - upx, upy = up, up - downx, downy = down, down - - # padx0, padx1, pady0, pady1 = _parse_padding(padding) - padx0, padx1, pady0, pady1 = padding[0], padding[1], padding[2], padding[3] - - # Upsample by inserting zeros. - x = x.reshape([batch_size, num_channels, in_height, 1, in_width, 1]) - x = torch.nn.functional.pad(x, [0, upx - 1, 0, 0, 0, upy - 1]) - x = x.reshape([batch_size, num_channels, in_height * upy, in_width * upx]) - - # Pad or crop. - x = torch.nn.functional.pad(x, [max(padx0, 0), max(padx1, 0), max(pady0, 0), max(pady1, 0)]) - x = x[:, :, max(-pady0, 0): x.shape[2] - max(-pady1, 0), max(-padx0, 0): x.shape[3] - max(-padx1, 0)] - - # Setup filter. - f = f * (gain ** (f.ndim / 2)) - f = f.to(x.dtype) - if not flip_filter: - f = f.flip(list(range(f.ndim))) - - # Convolve with the filter. - f = f[np.newaxis, np.newaxis].repeat([num_channels, 1] + [1] * f.ndim) - if f.ndim == 4: - x = conv2d(input=x, weight=f, groups=num_channels) - else: - x = conv2d(input=x, weight=f.unsqueeze(2), groups=num_channels) - x = conv2d(input=x, weight=f.unsqueeze(3), groups=num_channels) - - # Downsample by throwing away pixels. - x = x[:, :, ::downy, ::downx] - return x - - -def upsample2d(x, f, up=2, padding=0, flip_filter=False, gain=1, impl='cuda'): - r"""Upsample a batch of 2D images using the given 2D FIR filter. - - By default, the result is padded so that its shape is a multiple of the input. - User-specified padding is applied on top of that, with negative values - indicating cropping. Pixels outside the image are assumed to be zero. - - Args: - x: Float32/float64/float16 input tensor of the shape - `[batch_size, num_channels, in_height, in_width]`. - f: Float32 FIR filter of the shape - `[filter_height, filter_width]` (non-separable), - `[filter_taps]` (separable), or - `None` (identity). - up: Integer upsampling factor. Can be a single int or a list/tuple - `[x, y]` (default: 1). - padding: Padding with respect to the output. Can be a single number or a - list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` - (default: 0). - flip_filter: False = convolution, True = correlation (default: False). - gain: Overall scaling factor for signal magnitude (default: 1). - impl: Implementation to use. Can be `'ref'` or `'cuda'` (default: `'cuda'`). - - Returns: - Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. - """ - upx, upy = _parse_scaling(up) - # upx, upy = up, up - padx0, padx1, pady0, pady1 = _parse_padding(padding) - # padx0, padx1, pady0, pady1 = padding, padding, padding, padding - fw, fh = _get_filter_size(f) - p = [ - padx0 + (fw + upx - 1) // 2, - padx1 + (fw - upx) // 2, - pady0 + (fh + upy - 1) // 2, - pady1 + (fh - upy) // 2, - ] - return upfirdn2d(x, f, up=up, padding=p, flip_filter=flip_filter, gain=gain * upx * upy, impl=impl) - - -def downsample2d(x, f, down=2, padding=0, flip_filter=False, gain=1, impl='cuda'): - r"""Downsample a batch of 2D images using the given 2D FIR filter. - - By default, the result is padded so that its shape is a fraction of the input. - User-specified padding is applied on top of that, with negative values - indicating cropping. Pixels outside the image are assumed to be zero. - - Args: - x: Float32/float64/float16 input tensor of the shape - `[batch_size, num_channels, in_height, in_width]`. - f: Float32 FIR filter of the shape - `[filter_height, filter_width]` (non-separable), - `[filter_taps]` (separable), or - `None` (identity). - down: Integer downsampling factor. Can be a single int or a list/tuple - `[x, y]` (default: 1). - padding: Padding with respect to the input. Can be a single number or a - list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` - (default: 0). - flip_filter: False = convolution, True = correlation (default: False). - gain: Overall scaling factor for signal magnitude (default: 1). - impl: Implementation to use. Can be `'ref'` or `'cuda'` (default: `'cuda'`). - - Returns: - Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. - """ - downx, downy = _parse_scaling(down) - # padx0, padx1, pady0, pady1 = _parse_padding(padding) - padx0, padx1, pady0, pady1 = padding, padding, padding, padding - - fw, fh = _get_filter_size(f) - p = [ - padx0 + (fw - downx + 1) // 2, - padx1 + (fw - downx) // 2, - pady0 + (fh - downy + 1) // 2, - pady1 + (fh - downy) // 2, - ] - return upfirdn2d(x, f, down=down, padding=p, flip_filter=flip_filter, gain=gain, impl=impl) - - -def _conv2d_wrapper(x, w, stride=1, padding=0, groups=1, transpose=False, flip_weight=True): - """Wrapper for the underlying `conv2d()` and `conv_transpose2d()` implementations. - """ - out_channels, in_channels_per_group, kh, kw = _get_weight_shape(w) - - # Flip weight if requested. - if not flip_weight: # conv2d() actually performs correlation (flip_weight=True) not convolution (flip_weight=False). - w = w.flip([2, 3]) - - # Workaround performance pitfall in cuDNN 8.0.5, triggered when using - # 1x1 kernel + memory_format=channels_last + less than 64 channels. - if kw == 1 and kh == 1 and stride == 1 and padding in [0, [0, 0], (0, 0)] and not transpose: - if x.stride()[1] == 1 and min(out_channels, in_channels_per_group) < 64: - if out_channels <= 4 and groups == 1: - in_shape = x.shape - x = w.squeeze(3).squeeze(2) @ x.reshape([in_shape[0], in_channels_per_group, -1]) - x = x.reshape([in_shape[0], out_channels, in_shape[2], in_shape[3]]) - else: - x = x.to(memory_format=torch.contiguous_format) - w = w.to(memory_format=torch.contiguous_format) - x = conv2d(x, w, groups=groups) - return x.to(memory_format=torch.channels_last) - - # Otherwise => execute using conv2d_gradfix. - op = conv_transpose2d if transpose else conv2d - return op(x, w, stride=stride, padding=padding, groups=groups) - - -def conv2d_resample(x, w, f=None, up=1, down=1, padding=0, groups=1, flip_weight=True, flip_filter=False): - r"""2D convolution with optional up/downsampling. - - Padding is performed only once at the beginning, not between the operations. - - Args: - x: Input tensor of shape - `[batch_size, in_channels, in_height, in_width]`. - w: Weight tensor of shape - `[out_channels, in_channels//groups, kernel_height, kernel_width]`. - f: Low-pass filter for up/downsampling. Must be prepared beforehand by - calling setup_filter(). None = identity (default). - up: Integer upsampling factor (default: 1). - down: Integer downsampling factor (default: 1). - padding: Padding with respect to the upsampled image. Can be a single number - or a list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` - (default: 0). - groups: Split input channels into N groups (default: 1). - flip_weight: False = convolution, True = correlation (default: True). - flip_filter: False = convolution, True = correlation (default: False). - - Returns: - Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. - """ - # Validate arguments. - assert isinstance(x, torch.Tensor) and (x.ndim == 4) - assert isinstance(w, torch.Tensor) and (w.ndim == 4) and (w.dtype == x.dtype) - assert f is None or (isinstance(f, torch.Tensor) and f.ndim in [1, 2] and f.dtype == torch.float32) - assert isinstance(up, int) and (up >= 1) - assert isinstance(down, int) and (down >= 1) - # assert isinstance(groups, int) and (groups >= 1), f"!!!!!! groups: {groups} isinstance(groups, int) {isinstance(groups, int)} {type(groups)}" - out_channels, in_channels_per_group, kh, kw = _get_weight_shape(w) - fw, fh = _get_filter_size(f) - # px0, px1, py0, py1 = _parse_padding(padding) - px0, px1, py0, py1 = padding, padding, padding, padding - - # Adjust padding to account for up/downsampling. - if up > 1: - px0 += (fw + up - 1) // 2 - px1 += (fw - up) // 2 - py0 += (fh + up - 1) // 2 - py1 += (fh - up) // 2 - if down > 1: - px0 += (fw - down + 1) // 2 - px1 += (fw - down) // 2 - py0 += (fh - down + 1) // 2 - py1 += (fh - down) // 2 - - # Fast path: 1x1 convolution with downsampling only => downsample first, then convolve. - if kw == 1 and kh == 1 and (down > 1 and up == 1): - x = upfirdn2d(x=x, f=f, down=down, padding=[px0, px1, py0, py1], flip_filter=flip_filter) - x = _conv2d_wrapper(x=x, w=w, groups=groups, flip_weight=flip_weight) - return x - - # Fast path: 1x1 convolution with upsampling only => convolve first, then upsample. - if kw == 1 and kh == 1 and (up > 1 and down == 1): - x = _conv2d_wrapper(x=x, w=w, groups=groups, flip_weight=flip_weight) - x = upfirdn2d(x=x, f=f, up=up, padding=[px0, px1, py0, py1], gain=up ** 2, flip_filter=flip_filter) - return x - - # Fast path: downsampling only => use strided convolution. - if down > 1 and up == 1: - x = upfirdn2d(x=x, f=f, padding=[px0, px1, py0, py1], flip_filter=flip_filter) - x = _conv2d_wrapper(x=x, w=w, stride=down, groups=groups, flip_weight=flip_weight) - return x - - # Fast path: upsampling with optional downsampling => use transpose strided convolution. - if up > 1: - if groups == 1: - w = w.transpose(0, 1) - else: - w = w.reshape(groups, out_channels // groups, in_channels_per_group, kh, kw) - w = w.transpose(1, 2) - w = w.reshape(groups * in_channels_per_group, out_channels // groups, kh, kw) - px0 -= kw - 1 - px1 -= kw - up - py0 -= kh - 1 - py1 -= kh - up - pxt = max(min(-px0, -px1), 0) - pyt = max(min(-py0, -py1), 0) - x = _conv2d_wrapper(x=x, w=w, stride=up, padding=[pyt, pxt], groups=groups, transpose=True, - flip_weight=(not flip_weight)) - x = upfirdn2d(x=x, f=f, padding=[px0 + pxt, px1 + pxt, py0 + pyt, py1 + pyt], gain=up ** 2, - flip_filter=flip_filter) - if down > 1: - x = upfirdn2d(x=x, f=f, down=down, flip_filter=flip_filter) - return x - - # Fast path: no up/downsampling, padding supported by the underlying implementation => use plain conv2d. - if up == 1 and down == 1: - if px0 == px1 and py0 == py1 and px0 >= 0 and py0 >= 0: - return _conv2d_wrapper(x=x, w=w, padding=[py0, px0], groups=groups, flip_weight=flip_weight) - - # Fallback: Generic reference implementation. - x = upfirdn2d(x=x, f=(f if up > 1 else None), up=up, padding=[px0, px1, py0, py1], gain=up ** 2, - flip_filter=flip_filter) - x = _conv2d_wrapper(x=x, w=w, groups=groups, flip_weight=flip_weight) - if down > 1: - x = upfirdn2d(x=x, f=f, down=down, flip_filter=flip_filter) - return x - - -# ---------------------------------------------------------------------------- - - -def normalize_2nd_moment(x, dim=1, eps=1e-8): - return x * (x.square().mean(dim=dim, keepdim=True) + eps).rsqrt() - - -class FullyConnectedLayer(nn.Module): - def __init__(self, - in_features, # Number of input features. - out_features, # Number of output features. - bias=True, # Apply additive bias before the activation function? - activation='linear', # Activation function: 'relu', 'lrelu', etc. - lr_multiplier=1, # Learning rate multiplier. - bias_init=0, # Initial value for the additive bias. - ): - super().__init__() - self.weight = torch.nn.Parameter(torch.randn([out_features, in_features]) / lr_multiplier) - self.bias = torch.nn.Parameter(torch.full([out_features], np.float32(bias_init))) if bias else None - self.activation = activation - - self.weight_gain = lr_multiplier / np.sqrt(in_features) - self.bias_gain = lr_multiplier - - def forward(self, x): - w = self.weight * self.weight_gain - b = self.bias - if b is not None and self.bias_gain != 1: - b = b * self.bias_gain - - if self.activation == 'linear' and b is not None: - # out = torch.addmm(b.unsqueeze(0), x, w.t()) - x = x.matmul(w.t()) - out = x + b.reshape([-1 if i == x.ndim - 1 else 1 for i in range(x.ndim)]) - else: - x = x.matmul(w.t()) - out = bias_act(x, b, act=self.activation, dim=x.ndim - 1) - return out - - -class Conv2dLayer(nn.Module): - def __init__(self, - in_channels, # Number of input channels. - out_channels, # Number of output channels. - kernel_size, # Width and height of the convolution kernel. - bias=True, # Apply additive bias before the activation function? - activation='linear', # Activation function: 'relu', 'lrelu', etc. - up=1, # Integer upsampling factor. - down=1, # Integer downsampling factor. - resample_filter=[1, 3, 3, 1], # Low-pass filter to apply when resampling activations. - conv_clamp=None, # Clamp the output to +-X, None = disable clamping. - trainable=True, # Update the weights of this layer during training? - ): - super().__init__() - self.activation = activation - self.up = up - self.down = down - self.register_buffer('resample_filter', setup_filter(resample_filter)) - self.conv_clamp = conv_clamp - self.padding = kernel_size // 2 - self.weight_gain = 1 / np.sqrt(in_channels * (kernel_size ** 2)) - self.act_gain = activation_funcs[activation].def_gain - - weight = torch.randn([out_channels, in_channels, kernel_size, kernel_size]) - bias = torch.zeros([out_channels]) if bias else None - if trainable: - self.weight = torch.nn.Parameter(weight) - self.bias = torch.nn.Parameter(bias) if bias is not None else None - else: - self.register_buffer('weight', weight) - if bias is not None: - self.register_buffer('bias', bias) - else: - self.bias = None - - def forward(self, x, gain=1): - w = self.weight * self.weight_gain - x = conv2d_resample(x=x, w=w, f=self.resample_filter, up=self.up, down=self.down, - padding=self.padding) - - act_gain = self.act_gain * gain - act_clamp = self.conv_clamp * gain if self.conv_clamp is not None else None - out = bias_act(x, self.bias, act=self.activation, gain=act_gain, clamp=act_clamp) - return out - - class ModulatedConv2d(nn.Module): def __init__(self, in_channels, # Number of input channels. @@ -983,31 +388,6 @@ class DisBlock(nn.Module): return out -class MinibatchStdLayer(torch.nn.Module): - def __init__(self, group_size, num_channels=1): - super().__init__() - self.group_size = group_size - self.num_channels = num_channels - - def forward(self, x): - N, C, H, W = x.shape - G = torch.min(torch.as_tensor(self.group_size), - torch.as_tensor(N)) if self.group_size is not None else N - F = self.num_channels - c = C // F - - y = x.reshape(G, -1, F, c, H, - W) # [GnFcHW] Split minibatch N into n groups of size G, and channels C into F groups of size c. - y = y - y.mean(dim=0) # [GnFcHW] Subtract mean over group. - y = y.square().mean(dim=0) # [nFcHW] Calc variance over group. - y = (y + 1e-8).sqrt() # [nFcHW] Calc stddev over group. - y = y.mean(dim=[2, 3, 4]) # [nF] Take average over channels and pixels. - y = y.reshape(-1, F, 1, 1) # [nF11] Add missing dimensions. - y = y.repeat(G, 1, H, W) # [NFHW] Replicate over group and pixels. - x = torch.cat([x, y], dim=1) # [NCHW] Append to input as new channels. - return x - - class Discriminator(torch.nn.Module): def __init__(self, c_dim, # Conditioning label (C) dimensionality. @@ -2030,7 +1410,7 @@ class MAT(InpaintModel): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) - + G = Generator(z_dim=512, c_dim=0, w_dim=512, img_resolution=512, img_channels=3) self.model = load_model(G, MAT_MODEL_URL, device) self.z = torch.from_numpy(np.random.randn(1, G.z_dim)).to(device) # [1., 512] diff --git a/lama_cleaner/model/utils.py b/lama_cleaner/model/utils.py index 1158a44..9b4ee19 100644 --- a/lama_cleaner/model/utils.py +++ b/lama_cleaner/model/utils.py @@ -1,7 +1,12 @@ import math +from typing import Any import torch import numpy as np +import collections +from itertools import repeat + +from torch import conv2d, conv_transpose2d def make_beta_schedule(device, schedule, n_timestep, linear_start=1e-4, linear_end=2e-2, cosine_s=8e-3): @@ -84,3 +89,621 @@ def timestep_embedding(device, timesteps, dim, max_period=10000, repeat_only=Fal if dim % 2: embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) return embedding + + +###### MAT and FcF ####### + + +def normalize_2nd_moment(x, dim=1, eps=1e-8): + return x * (x.square().mean(dim=dim, keepdim=True) + eps).rsqrt() + + +class EasyDict(dict): + """Convenience class that behaves like a dict but allows access with the attribute syntax.""" + + def __getattr__(self, name: str) -> Any: + try: + return self[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name: str, value: Any) -> None: + self[name] = value + + def __delattr__(self, name: str) -> None: + del self[name] + + +def _bias_act_ref(x, b=None, dim=1, act='linear', alpha=None, gain=None, clamp=None): + """Slow reference implementation of `bias_act()` using standard TensorFlow ops. + """ + assert isinstance(x, torch.Tensor) + assert clamp is None or clamp >= 0 + spec = activation_funcs[act] + alpha = float(alpha if alpha is not None else spec.def_alpha) + gain = float(gain if gain is not None else spec.def_gain) + clamp = float(clamp if clamp is not None else -1) + + # Add bias. + if b is not None: + assert isinstance(b, torch.Tensor) and b.ndim == 1 + assert 0 <= dim < x.ndim + assert b.shape[0] == x.shape[dim] + x = x + b.reshape([-1 if i == dim else 1 for i in range(x.ndim)]) + + # Evaluate activation function. + alpha = float(alpha) + x = spec.func(x, alpha=alpha) + + # Scale by gain. + gain = float(gain) + if gain != 1: + x = x * gain + + # Clamp. + if clamp >= 0: + x = x.clamp(-clamp, clamp) # pylint: disable=invalid-unary-operand-type + return x + + +def bias_act(x, b=None, dim=1, act='linear', alpha=None, gain=None, clamp=None, impl='ref'): + r"""Fused bias and activation function. + + Adds bias `b` to activation tensor `x`, evaluates activation function `act`, + and scales the result by `gain`. Each of the steps is optional. In most cases, + the fused op is considerably more efficient than performing the same calculation + using standard PyTorch ops. It supports first and second order gradients, + but not third order gradients. + + Args: + x: Input activation tensor. Can be of any shape. + b: Bias vector, or `None` to disable. Must be a 1D tensor of the same type + as `x`. The shape must be known, and it must match the dimension of `x` + corresponding to `dim`. + dim: The dimension in `x` corresponding to the elements of `b`. + The value of `dim` is ignored if `b` is not specified. + act: Name of the activation function to evaluate, or `"linear"` to disable. + Can be e.g. `"relu"`, `"lrelu"`, `"tanh"`, `"sigmoid"`, `"swish"`, etc. + See `activation_funcs` for a full list. `None` is not allowed. + alpha: Shape parameter for the activation function, or `None` to use the default. + gain: Scaling factor for the output tensor, or `None` to use default. + See `activation_funcs` for the default scaling of each activation function. + If unsure, consider specifying 1. + clamp: Clamp the output values to `[-clamp, +clamp]`, or `None` to disable + the clamping (default). + impl: Name of the implementation to use. Can be `"ref"` or `"cuda"` (default). + + Returns: + Tensor of the same shape and datatype as `x`. + """ + assert isinstance(x, torch.Tensor) + assert impl in ['ref', 'cuda'] + return _bias_act_ref(x=x, b=b, dim=dim, act=act, alpha=alpha, gain=gain, clamp=clamp) + + +def _get_filter_size(f): + if f is None: + return 1, 1 + + assert isinstance(f, torch.Tensor) and f.ndim in [1, 2] + fw = f.shape[-1] + fh = f.shape[0] + + fw = int(fw) + fh = int(fh) + assert fw >= 1 and fh >= 1 + return fw, fh + + +def _get_weight_shape(w): + shape = [int(sz) for sz in w.shape] + return shape + + +def _parse_scaling(scaling): + if isinstance(scaling, int): + scaling = [scaling, scaling] + assert isinstance(scaling, (list, tuple)) + assert all(isinstance(x, int) for x in scaling) + sx, sy = scaling + assert sx >= 1 and sy >= 1 + return sx, sy + + +def _parse_padding(padding): + if isinstance(padding, int): + padding = [padding, padding] + assert isinstance(padding, (list, tuple)) + assert all(isinstance(x, int) for x in padding) + if len(padding) == 2: + padx, pady = padding + padding = [padx, padx, pady, pady] + padx0, padx1, pady0, pady1 = padding + return padx0, padx1, pady0, pady1 + + +def setup_filter(f, device=torch.device('cpu'), normalize=True, flip_filter=False, gain=1, separable=None): + r"""Convenience function to setup 2D FIR filter for `upfirdn2d()`. + + Args: + f: Torch tensor, numpy array, or python list of the shape + `[filter_height, filter_width]` (non-separable), + `[filter_taps]` (separable), + `[]` (impulse), or + `None` (identity). + device: Result device (default: cpu). + normalize: Normalize the filter so that it retains the magnitude + for constant input signal (DC)? (default: True). + flip_filter: Flip the filter? (default: False). + gain: Overall scaling factor for signal magnitude (default: 1). + separable: Return a separable filter? (default: select automatically). + + Returns: + Float32 tensor of the shape + `[filter_height, filter_width]` (non-separable) or + `[filter_taps]` (separable). + """ + # Validate. + if f is None: + f = 1 + f = torch.as_tensor(f, dtype=torch.float32) + assert f.ndim in [0, 1, 2] + assert f.numel() > 0 + if f.ndim == 0: + f = f[np.newaxis] + + # Separable? + if separable is None: + separable = (f.ndim == 1 and f.numel() >= 8) + if f.ndim == 1 and not separable: + f = f.ger(f) + assert f.ndim == (1 if separable else 2) + + # Apply normalize, flip, gain, and device. + if normalize: + f /= f.sum() + if flip_filter: + f = f.flip(list(range(f.ndim))) + f = f * (gain ** (f.ndim / 2)) + f = f.to(device=device) + return f + + +def _ntuple(n): + def parse(x): + if isinstance(x, collections.abc.Iterable): + return x + return tuple(repeat(x, n)) + + return parse + + +to_2tuple = _ntuple(2) + +activation_funcs = { + 'linear': EasyDict(func=lambda x, **_: x, def_alpha=0, def_gain=1, cuda_idx=1, ref='', has_2nd_grad=False), + 'relu': EasyDict(func=lambda x, **_: torch.nn.functional.relu(x), def_alpha=0, def_gain=np.sqrt(2), cuda_idx=2, + ref='y', has_2nd_grad=False), + 'lrelu': EasyDict(func=lambda x, alpha, **_: torch.nn.functional.leaky_relu(x, alpha), def_alpha=0.2, + def_gain=np.sqrt(2), cuda_idx=3, ref='y', has_2nd_grad=False), + 'tanh': EasyDict(func=lambda x, **_: torch.tanh(x), def_alpha=0, def_gain=1, cuda_idx=4, ref='y', + has_2nd_grad=True), + 'sigmoid': EasyDict(func=lambda x, **_: torch.sigmoid(x), def_alpha=0, def_gain=1, cuda_idx=5, ref='y', + has_2nd_grad=True), + 'elu': EasyDict(func=lambda x, **_: torch.nn.functional.elu(x), def_alpha=0, def_gain=1, cuda_idx=6, ref='y', + has_2nd_grad=True), + 'selu': EasyDict(func=lambda x, **_: torch.nn.functional.selu(x), def_alpha=0, def_gain=1, cuda_idx=7, ref='y', + has_2nd_grad=True), + 'softplus': EasyDict(func=lambda x, **_: torch.nn.functional.softplus(x), def_alpha=0, def_gain=1, cuda_idx=8, + ref='y', has_2nd_grad=True), + 'swish': EasyDict(func=lambda x, **_: torch.sigmoid(x) * x, def_alpha=0, def_gain=np.sqrt(2), cuda_idx=9, ref='x', + has_2nd_grad=True), +} + + +def upfirdn2d(x, f, up=1, down=1, padding=0, flip_filter=False, gain=1, impl='cuda'): + r"""Pad, upsample, filter, and downsample a batch of 2D images. + + Performs the following sequence of operations for each channel: + + 1. Upsample the image by inserting N-1 zeros after each pixel (`up`). + + 2. Pad the image with the specified number of zeros on each side (`padding`). + Negative padding corresponds to cropping the image. + + 3. Convolve the image with the specified 2D FIR filter (`f`), shrinking it + so that the footprint of all output pixels lies within the input image. + + 4. Downsample the image by keeping every Nth pixel (`down`). + + This sequence of operations bears close resemblance to scipy.signal.upfirdn(). + The fused op is considerably more efficient than performing the same calculation + using standard PyTorch ops. It supports gradients of arbitrary order. + + Args: + x: Float32/float64/float16 input tensor of the shape + `[batch_size, num_channels, in_height, in_width]`. + f: Float32 FIR filter of the shape + `[filter_height, filter_width]` (non-separable), + `[filter_taps]` (separable), or + `None` (identity). + up: Integer upsampling factor. Can be a single int or a list/tuple + `[x, y]` (default: 1). + down: Integer downsampling factor. Can be a single int or a list/tuple + `[x, y]` (default: 1). + padding: Padding with respect to the upsampled image. Can be a single number + or a list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` + (default: 0). + flip_filter: False = convolution, True = correlation (default: False). + gain: Overall scaling factor for signal magnitude (default: 1). + impl: Implementation to use. Can be `'ref'` or `'cuda'` (default: `'cuda'`). + + Returns: + Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. + """ + # assert isinstance(x, torch.Tensor) + # assert impl in ['ref', 'cuda'] + return _upfirdn2d_ref(x, f, up=up, down=down, padding=padding, flip_filter=flip_filter, gain=gain) + + +def _upfirdn2d_ref(x, f, up=1, down=1, padding=0, flip_filter=False, gain=1): + """Slow reference implementation of `upfirdn2d()` using standard PyTorch ops. + """ + # Validate arguments. + assert isinstance(x, torch.Tensor) and x.ndim == 4 + if f is None: + f = torch.ones([1, 1], dtype=torch.float32, device=x.device) + assert isinstance(f, torch.Tensor) and f.ndim in [1, 2] + assert f.dtype == torch.float32 and not f.requires_grad + batch_size, num_channels, in_height, in_width = x.shape + # upx, upy = _parse_scaling(up) + # downx, downy = _parse_scaling(down) + + upx, upy = up, up + downx, downy = down, down + + # padx0, padx1, pady0, pady1 = _parse_padding(padding) + padx0, padx1, pady0, pady1 = padding[0], padding[1], padding[2], padding[3] + + # Upsample by inserting zeros. + x = x.reshape([batch_size, num_channels, in_height, 1, in_width, 1]) + x = torch.nn.functional.pad(x, [0, upx - 1, 0, 0, 0, upy - 1]) + x = x.reshape([batch_size, num_channels, in_height * upy, in_width * upx]) + + # Pad or crop. + x = torch.nn.functional.pad(x, [max(padx0, 0), max(padx1, 0), max(pady0, 0), max(pady1, 0)]) + x = x[:, :, max(-pady0, 0): x.shape[2] - max(-pady1, 0), max(-padx0, 0): x.shape[3] - max(-padx1, 0)] + + # Setup filter. + f = f * (gain ** (f.ndim / 2)) + f = f.to(x.dtype) + if not flip_filter: + f = f.flip(list(range(f.ndim))) + + # Convolve with the filter. + f = f[np.newaxis, np.newaxis].repeat([num_channels, 1] + [1] * f.ndim) + if f.ndim == 4: + x = conv2d(input=x, weight=f, groups=num_channels) + else: + x = conv2d(input=x, weight=f.unsqueeze(2), groups=num_channels) + x = conv2d(input=x, weight=f.unsqueeze(3), groups=num_channels) + + # Downsample by throwing away pixels. + x = x[:, :, ::downy, ::downx] + return x + + +def downsample2d(x, f, down=2, padding=0, flip_filter=False, gain=1, impl='cuda'): + r"""Downsample a batch of 2D images using the given 2D FIR filter. + + By default, the result is padded so that its shape is a fraction of the input. + User-specified padding is applied on top of that, with negative values + indicating cropping. Pixels outside the image are assumed to be zero. + + Args: + x: Float32/float64/float16 input tensor of the shape + `[batch_size, num_channels, in_height, in_width]`. + f: Float32 FIR filter of the shape + `[filter_height, filter_width]` (non-separable), + `[filter_taps]` (separable), or + `None` (identity). + down: Integer downsampling factor. Can be a single int or a list/tuple + `[x, y]` (default: 1). + padding: Padding with respect to the input. Can be a single number or a + list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` + (default: 0). + flip_filter: False = convolution, True = correlation (default: False). + gain: Overall scaling factor for signal magnitude (default: 1). + impl: Implementation to use. Can be `'ref'` or `'cuda'` (default: `'cuda'`). + + Returns: + Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. + """ + downx, downy = _parse_scaling(down) + # padx0, padx1, pady0, pady1 = _parse_padding(padding) + padx0, padx1, pady0, pady1 = padding, padding, padding, padding + + fw, fh = _get_filter_size(f) + p = [ + padx0 + (fw - downx + 1) // 2, + padx1 + (fw - downx) // 2, + pady0 + (fh - downy + 1) // 2, + pady1 + (fh - downy) // 2, + ] + return upfirdn2d(x, f, down=down, padding=p, flip_filter=flip_filter, gain=gain, impl=impl) + + +def upsample2d(x, f, up=2, padding=0, flip_filter=False, gain=1, impl='cuda'): + r"""Upsample a batch of 2D images using the given 2D FIR filter. + + By default, the result is padded so that its shape is a multiple of the input. + User-specified padding is applied on top of that, with negative values + indicating cropping. Pixels outside the image are assumed to be zero. + + Args: + x: Float32/float64/float16 input tensor of the shape + `[batch_size, num_channels, in_height, in_width]`. + f: Float32 FIR filter of the shape + `[filter_height, filter_width]` (non-separable), + `[filter_taps]` (separable), or + `None` (identity). + up: Integer upsampling factor. Can be a single int or a list/tuple + `[x, y]` (default: 1). + padding: Padding with respect to the output. Can be a single number or a + list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` + (default: 0). + flip_filter: False = convolution, True = correlation (default: False). + gain: Overall scaling factor for signal magnitude (default: 1). + impl: Implementation to use. Can be `'ref'` or `'cuda'` (default: `'cuda'`). + + Returns: + Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. + """ + upx, upy = _parse_scaling(up) + # upx, upy = up, up + padx0, padx1, pady0, pady1 = _parse_padding(padding) + # padx0, padx1, pady0, pady1 = padding, padding, padding, padding + fw, fh = _get_filter_size(f) + p = [ + padx0 + (fw + upx - 1) // 2, + padx1 + (fw - upx) // 2, + pady0 + (fh + upy - 1) // 2, + pady1 + (fh - upy) // 2, + ] + return upfirdn2d(x, f, up=up, padding=p, flip_filter=flip_filter, gain=gain * upx * upy, impl=impl) + + +class MinibatchStdLayer(torch.nn.Module): + def __init__(self, group_size, num_channels=1): + super().__init__() + self.group_size = group_size + self.num_channels = num_channels + + def forward(self, x): + N, C, H, W = x.shape + G = torch.min(torch.as_tensor(self.group_size), + torch.as_tensor(N)) if self.group_size is not None else N + F = self.num_channels + c = C // F + + y = x.reshape(G, -1, F, c, H, + W) # [GnFcHW] Split minibatch N into n groups of size G, and channels C into F groups of size c. + y = y - y.mean(dim=0) # [GnFcHW] Subtract mean over group. + y = y.square().mean(dim=0) # [nFcHW] Calc variance over group. + y = (y + 1e-8).sqrt() # [nFcHW] Calc stddev over group. + y = y.mean(dim=[2, 3, 4]) # [nF] Take average over channels and pixels. + y = y.reshape(-1, F, 1, 1) # [nF11] Add missing dimensions. + y = y.repeat(G, 1, H, W) # [NFHW] Replicate over group and pixels. + x = torch.cat([x, y], dim=1) # [NCHW] Append to input as new channels. + return x + + +class FullyConnectedLayer(torch.nn.Module): + def __init__(self, + in_features, # Number of input features. + out_features, # Number of output features. + bias=True, # Apply additive bias before the activation function? + activation='linear', # Activation function: 'relu', 'lrelu', etc. + lr_multiplier=1, # Learning rate multiplier. + bias_init=0, # Initial value for the additive bias. + ): + super().__init__() + self.weight = torch.nn.Parameter(torch.randn([out_features, in_features]) / lr_multiplier) + self.bias = torch.nn.Parameter(torch.full([out_features], np.float32(bias_init))) if bias else None + self.activation = activation + + self.weight_gain = lr_multiplier / np.sqrt(in_features) + self.bias_gain = lr_multiplier + + def forward(self, x): + w = self.weight * self.weight_gain + b = self.bias + if b is not None and self.bias_gain != 1: + b = b * self.bias_gain + + if self.activation == 'linear' and b is not None: + # out = torch.addmm(b.unsqueeze(0), x, w.t()) + x = x.matmul(w.t()) + out = x + b.reshape([-1 if i == x.ndim - 1 else 1 for i in range(x.ndim)]) + else: + x = x.matmul(w.t()) + out = bias_act(x, b, act=self.activation, dim=x.ndim - 1) + return out + + +def _conv2d_wrapper(x, w, stride=1, padding=0, groups=1, transpose=False, flip_weight=True): + """Wrapper for the underlying `conv2d()` and `conv_transpose2d()` implementations. + """ + out_channels, in_channels_per_group, kh, kw = _get_weight_shape(w) + + # Flip weight if requested. + if not flip_weight: # conv2d() actually performs correlation (flip_weight=True) not convolution (flip_weight=False). + w = w.flip([2, 3]) + + # Workaround performance pitfall in cuDNN 8.0.5, triggered when using + # 1x1 kernel + memory_format=channels_last + less than 64 channels. + if kw == 1 and kh == 1 and stride == 1 and padding in [0, [0, 0], (0, 0)] and not transpose: + if x.stride()[1] == 1 and min(out_channels, in_channels_per_group) < 64: + if out_channels <= 4 and groups == 1: + in_shape = x.shape + x = w.squeeze(3).squeeze(2) @ x.reshape([in_shape[0], in_channels_per_group, -1]) + x = x.reshape([in_shape[0], out_channels, in_shape[2], in_shape[3]]) + else: + x = x.to(memory_format=torch.contiguous_format) + w = w.to(memory_format=torch.contiguous_format) + x = conv2d(x, w, groups=groups) + return x.to(memory_format=torch.channels_last) + + # Otherwise => execute using conv2d_gradfix. + op = conv_transpose2d if transpose else conv2d + return op(x, w, stride=stride, padding=padding, groups=groups) + + +def conv2d_resample(x, w, f=None, up=1, down=1, padding=0, groups=1, flip_weight=True, flip_filter=False): + r"""2D convolution with optional up/downsampling. + + Padding is performed only once at the beginning, not between the operations. + + Args: + x: Input tensor of shape + `[batch_size, in_channels, in_height, in_width]`. + w: Weight tensor of shape + `[out_channels, in_channels//groups, kernel_height, kernel_width]`. + f: Low-pass filter for up/downsampling. Must be prepared beforehand by + calling setup_filter(). None = identity (default). + up: Integer upsampling factor (default: 1). + down: Integer downsampling factor (default: 1). + padding: Padding with respect to the upsampled image. Can be a single number + or a list/tuple `[x, y]` or `[x_before, x_after, y_before, y_after]` + (default: 0). + groups: Split input channels into N groups (default: 1). + flip_weight: False = convolution, True = correlation (default: True). + flip_filter: False = convolution, True = correlation (default: False). + + Returns: + Tensor of the shape `[batch_size, num_channels, out_height, out_width]`. + """ + # Validate arguments. + assert isinstance(x, torch.Tensor) and (x.ndim == 4) + assert isinstance(w, torch.Tensor) and (w.ndim == 4) and (w.dtype == x.dtype) + assert f is None or (isinstance(f, torch.Tensor) and f.ndim in [1, 2] and f.dtype == torch.float32) + assert isinstance(up, int) and (up >= 1) + assert isinstance(down, int) and (down >= 1) + # assert isinstance(groups, int) and (groups >= 1), f"!!!!!! groups: {groups} isinstance(groups, int) {isinstance(groups, int)} {type(groups)}" + out_channels, in_channels_per_group, kh, kw = _get_weight_shape(w) + fw, fh = _get_filter_size(f) + # px0, px1, py0, py1 = _parse_padding(padding) + px0, px1, py0, py1 = padding, padding, padding, padding + + # Adjust padding to account for up/downsampling. + if up > 1: + px0 += (fw + up - 1) // 2 + px1 += (fw - up) // 2 + py0 += (fh + up - 1) // 2 + py1 += (fh - up) // 2 + if down > 1: + px0 += (fw - down + 1) // 2 + px1 += (fw - down) // 2 + py0 += (fh - down + 1) // 2 + py1 += (fh - down) // 2 + + # Fast path: 1x1 convolution with downsampling only => downsample first, then convolve. + if kw == 1 and kh == 1 and (down > 1 and up == 1): + x = upfirdn2d(x=x, f=f, down=down, padding=[px0, px1, py0, py1], flip_filter=flip_filter) + x = _conv2d_wrapper(x=x, w=w, groups=groups, flip_weight=flip_weight) + return x + + # Fast path: 1x1 convolution with upsampling only => convolve first, then upsample. + if kw == 1 and kh == 1 and (up > 1 and down == 1): + x = _conv2d_wrapper(x=x, w=w, groups=groups, flip_weight=flip_weight) + x = upfirdn2d(x=x, f=f, up=up, padding=[px0, px1, py0, py1], gain=up ** 2, flip_filter=flip_filter) + return x + + # Fast path: downsampling only => use strided convolution. + if down > 1 and up == 1: + x = upfirdn2d(x=x, f=f, padding=[px0, px1, py0, py1], flip_filter=flip_filter) + x = _conv2d_wrapper(x=x, w=w, stride=down, groups=groups, flip_weight=flip_weight) + return x + + # Fast path: upsampling with optional downsampling => use transpose strided convolution. + if up > 1: + if groups == 1: + w = w.transpose(0, 1) + else: + w = w.reshape(groups, out_channels // groups, in_channels_per_group, kh, kw) + w = w.transpose(1, 2) + w = w.reshape(groups * in_channels_per_group, out_channels // groups, kh, kw) + px0 -= kw - 1 + px1 -= kw - up + py0 -= kh - 1 + py1 -= kh - up + pxt = max(min(-px0, -px1), 0) + pyt = max(min(-py0, -py1), 0) + x = _conv2d_wrapper(x=x, w=w, stride=up, padding=[pyt, pxt], groups=groups, transpose=True, + flip_weight=(not flip_weight)) + x = upfirdn2d(x=x, f=f, padding=[px0 + pxt, px1 + pxt, py0 + pyt, py1 + pyt], gain=up ** 2, + flip_filter=flip_filter) + if down > 1: + x = upfirdn2d(x=x, f=f, down=down, flip_filter=flip_filter) + return x + + # Fast path: no up/downsampling, padding supported by the underlying implementation => use plain conv2d. + if up == 1 and down == 1: + if px0 == px1 and py0 == py1 and px0 >= 0 and py0 >= 0: + return _conv2d_wrapper(x=x, w=w, padding=[py0, px0], groups=groups, flip_weight=flip_weight) + + # Fallback: Generic reference implementation. + x = upfirdn2d(x=x, f=(f if up > 1 else None), up=up, padding=[px0, px1, py0, py1], gain=up ** 2, + flip_filter=flip_filter) + x = _conv2d_wrapper(x=x, w=w, groups=groups, flip_weight=flip_weight) + if down > 1: + x = upfirdn2d(x=x, f=f, down=down, flip_filter=flip_filter) + return x + + +class Conv2dLayer(torch.nn.Module): + def __init__(self, + in_channels, # Number of input channels. + out_channels, # Number of output channels. + kernel_size, # Width and height of the convolution kernel. + bias=True, # Apply additive bias before the activation function? + activation='linear', # Activation function: 'relu', 'lrelu', etc. + up=1, # Integer upsampling factor. + down=1, # Integer downsampling factor. + resample_filter=[1, 3, 3, 1], # Low-pass filter to apply when resampling activations. + conv_clamp=None, # Clamp the output to +-X, None = disable clamping. + channels_last=False, # Expect the input to have memory_format=channels_last? + trainable=True, # Update the weights of this layer during training? + ): + super().__init__() + self.activation = activation + self.up = up + self.down = down + self.register_buffer('resample_filter', setup_filter(resample_filter)) + self.conv_clamp = conv_clamp + self.padding = kernel_size // 2 + self.weight_gain = 1 / np.sqrt(in_channels * (kernel_size ** 2)) + self.act_gain = activation_funcs[activation].def_gain + + memory_format = torch.channels_last if channels_last else torch.contiguous_format + weight = torch.randn([out_channels, in_channels, kernel_size, kernel_size]).to(memory_format=memory_format) + bias = torch.zeros([out_channels]) if bias else None + if trainable: + self.weight = torch.nn.Parameter(weight) + self.bias = torch.nn.Parameter(bias) if bias is not None else None + else: + self.register_buffer('weight', weight) + if bias is not None: + self.register_buffer('bias', bias) + else: + self.bias = None + + def forward(self, x, gain=1): + w = self.weight * self.weight_gain + x = conv2d_resample(x=x, w=w, f=self.resample_filter, up=self.up, down=self.down, + padding=self.padding) + + act_gain = self.act_gain * gain + act_clamp = self.conv_clamp * gain if self.conv_clamp is not None else None + out = bias_act(x, self.bias, act=self.activation, gain=act_gain, clamp=act_clamp) + return out diff --git a/lama_cleaner/model_manager.py b/lama_cleaner/model_manager.py index 0b4152f..01435f7 100644 --- a/lama_cleaner/model_manager.py +++ b/lama_cleaner/model_manager.py @@ -1,3 +1,4 @@ +from lama_cleaner.model.fcf import FcF from lama_cleaner.model.lama import LaMa from lama_cleaner.model.ldm import LDM from lama_cleaner.model.mat import MAT @@ -8,7 +9,8 @@ models = { 'lama': LaMa, 'ldm': LDM, 'zits': ZITS, - 'mat': MAT + 'mat': MAT, + 'fcf': FcF } diff --git a/lama_cleaner/tests/test_model.py b/lama_cleaner/tests/test_model.py index e3205c3..3eb98be 100644 --- a/lama_cleaner/tests/test_model.py +++ b/lama_cleaner/tests/test_model.py @@ -1,8 +1,6 @@ -import os from pathlib import Path import cv2 -import numpy as np import pytest from lama_cleaner.model_manager import ModelManager @@ -16,14 +14,9 @@ def get_data(fx=1, fy=1.0): img = cv2.cvtColor(img, cv2.COLOR_BGRA2RGB) mask = cv2.imread(str(current_dir / "mask.png"), cv2.IMREAD_GRAYSCALE) - # img = cv2.imread("/Users/qing/code/github/MAT/test_sets/Places/images/test1.jpg") - # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - # mask = cv2.imread("/Users/qing/code/github/MAT/test_sets/Places/masks/mask1.png", cv2.IMREAD_GRAYSCALE) - # mask = 255 - mask - if fx != 1: - img = cv2.resize(img, None, fx=fx, fy=fy) - mask = cv2.resize(mask, None, fx=fx, fy=fy) + img = cv2.resize(img, None, fx=fx, fy=fy, interpolation=cv2.INTER_AREA) + mask = cv2.resize(mask, None, fx=fx, fy=fy, interpolation=cv2.INTER_NEAREST) return img, mask @@ -134,3 +127,19 @@ def test_mat(strategy): cfg, f"mat_{strategy.capitalize()}_result.png", ) + + +@pytest.mark.parametrize( + "strategy", [HDStrategy.ORIGINAL] +) +def test_fcf(strategy): + model = ModelManager(name="fcf", device="cpu") + cfg = get_config(strategy) + + assert_equal( + model, + cfg, + f"fcf_{strategy.capitalize()}_result.png", + fx=2, + fy=2 + )