diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 73a840f7..c6aa73fa 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,23 +2,7 @@ - - - - - - - - - - - - - - - - - + diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 75b2cb0d..b9cc92cb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,11 +53,11 @@ repos: rev: 23.3.0 hooks: - id: black - - repo: https://github.com/codespell-project/codespell - rev: v2.2.4 - hooks: - - id: codespell - args: [-w] + #- repo: https://github.com/codespell-project/codespell + # rev: v2.2.4 + # hooks: + # - id: codespell + # args: [-w] - repo: https://github.com/PyCQA/flake8 rev: 6.0.0 hooks: diff --git a/LICENSE b/LICENSE index 0bcb3ed1..eabea607 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,10 @@ MIT License Copyright (c) 2023 34j and contributors +Copyright (c) 2022 NVIDIA CORPORATION. +Copyright (c) 2020 Edward Dixon +Copyright 2020 Alexandre Défossez +Copyright (c) 2023 yxlllc Copyright (c) 2021 Jingyi Li Permission is hereby granted, free of charge, to any person obtaining a copy @@ -221,3 +225,32 @@ SOFTWARE. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. +BSD 3-Clause License + +Copyright (c) 2019, Seungwon Park 박승원 +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/poetry.lock b/poetry.lock index 5e927545..710e0ea3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -160,6 +160,17 @@ files = [ {file = "alabaster-0.7.13.tar.gz", hash = "sha256:a27a4a084d5e690e16e01e03ad2b2e552c61a65469419b907243193de1a84ae2"}, ] +[[package]] +name = "alias-free-torch" +version = "0.0.6" +description = "alias free torch" +category = "dev" +optional = false +python-versions = ">=3" +files = [ + {file = "alias_free_torch-0.0.6-py3-none-any.whl", hash = "sha256:3a77e81147caf00f0b05483498e672ad3623b05800b82ace163d7adecac8b033"}, +] + [[package]] name = "altair" version = "4.2.2" @@ -1013,6 +1024,18 @@ files = [ {file = "docutils-0.18.1.tar.gz", hash = "sha256:679987caf361a7539d76e584cbeddc311e3aee937877c87346f31debc63e9d06"}, ] +[[package]] +name = "einops" +version = "0.6.0" +description = "A new flavour of deep learning operations" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "einops-0.6.0-py3-none-any.whl", hash = "sha256:c7b187a5dc725f079860ec2d330c1820448948622d826273345a8dd8d5f695bd"}, + {file = "einops-0.6.0.tar.gz", hash = "sha256:6f6c78739316a2e3ccbce8052310497e69da092935e4173f2e76ec4e3a336a35"}, +] + [[package]] name = "email-validator" version = "1.3.1" @@ -1333,6 +1356,37 @@ smb = ["smbprotocol"] ssh = ["paramiko"] tqdm = ["tqdm"] +[[package]] +name = "gin" +version = "0.1.006" +description = "Git index file parser" +category = "dev" +optional = false +python-versions = "*" +files = [ + {file = "gin-0.1.006.tar.bz2", hash = "sha256:0747da840881792f1726f9145094953b0a1499e9b41324a14ca6a10c03baa1ef"}, +] + +[[package]] +name = "gin-config" +version = "0.5.0" +description = "Gin-Config: A lightweight configuration library for Python" +category = "dev" +optional = false +python-versions = "*" +files = [ + {file = "gin-config-0.5.0.tar.gz", hash = "sha256:0c6ea5026ded927c8c93c990b01c695257c1df446e45e549a158cfbc79e19ed6"}, + {file = "gin_config-0.5.0-py3-none-any.whl", hash = "sha256:bddb7ca221ea2b46cdb59321e79fecf02d6e3b728906047fcd4076c297609fd6"}, +] + +[package.extras] +pytorch-nightly = ["pytorch-nightly"] +tensorflow = ["tensorflow (>=1.13.0)"] +tensorflow-gpu = ["tensorflow-gpu (>=1.13.0)"] +testing = ["absl-py (>=0.1.6)", "mock (>=3.0.5)", "nose"] +tf-nightly = ["tf-nightly"] +torch = ["torch (>=1.3.0)"] + [[package]] name = "google-auth" version = "2.17.2" @@ -2182,6 +2236,22 @@ files = [ {file = "llvmlite-0.39.1.tar.gz", hash = "sha256:b43abd7c82e805261c425d50335be9a6c4f84264e34d6d6e475207300005d572"}, ] +[[package]] +name = "local-attention" +version = "1.8.5" +description = "Local attention, window with lookback, for language modeling" +category = "dev" +optional = false +python-versions = "*" +files = [ + {file = "local-attention-1.8.5.tar.gz", hash = "sha256:8de14fb051cfa8ded4e85f1223c5869b94c801b2ec932eedbeb4a8bc85df974e"}, + {file = "local_attention-1.8.5-py3-none-any.whl", hash = "sha256:24c24ed44d3199dce400fd6db468acfeee68e3a742dfe1fffd267e0708dd7112"}, +] + +[package.dependencies] +einops = ">=0.6.0" +torch = "*" + [[package]] name = "lxml" version = "4.9.2" @@ -5673,4 +5743,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.11" -content-hash = "5b33ef9ebc86cbbfbc5a0c514c774af5d719196328c6ce461b18d948a9abad21" +content-hash = "e829c9c315ebb001ad4a95a2ee95f9172454d598b542e9585e346cbf2581b53d" diff --git a/pyproject.toml b/pyproject.toml index d053111d..992b0199 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,16 @@ myst-parser = ">=0.16" sphinx = ">=4.0" sphinx-rtd-theme = ">=1.0" + +[tool.poetry.group.pc_ddsp.dependencies] +gin = "^0.1.6" +gin-config = "^0.5.0" +local-attention = "^1.8.5" + + +[tool.poetry.group.bigvgan.dependencies] +alias-free-torch = "^0.0.6" + [tool.semantic_release] branch = "main" version_toml = "pyproject.toml:tool.poetry.version" diff --git a/src/so_vits_svc_fork/dataset.py b/src/so_vits_svc_fork/dataset.py index 7aed7482..a25d5070 100644 --- a/src/so_vits_svc_fork/dataset.py +++ b/src/so_vits_svc_fork/dataset.py @@ -64,7 +64,7 @@ def _pad_stack(array: Sequence[torch.Tensor]) -> torch.Tensor: class TextAudioCollate(nn.Module): def forward( self, batch: Sequence[dict[str, torch.Tensor]] - ) -> tuple[torch.Tensor, ...]: + ) -> dict[str, torch.Tensor]: batch = [b for b in batch if b is not None] batch = list(sorted(batch, key=lambda x: x["mel_spec"].shape[1], reverse=True)) lengths = torch.tensor([b["mel_spec"].shape[1] for b in batch]).long() @@ -74,14 +74,5 @@ def forward( results[key] = _pad_stack([b[key] for b in batch]).cpu() else: results[key] = torch.tensor([[b[key]] for b in batch]).cpu() - - return ( - results["content"], - results["f0"], - results["spec"], - results["mel_spec"], - results["audio"], - results["spk"], - lengths, - results["uv"], - ) + results["length"] = lengths + return results diff --git a/src/so_vits_svc_fork/f0.py b/src/so_vits_svc_fork/f0.py index d044ddd1..6b84ddb6 100644 --- a/src/so_vits_svc_fork/f0.py +++ b/src/so_vits_svc_fork/f0.py @@ -8,7 +8,7 @@ import torchcrepe from cm_time import timer from numpy import dtype, float32, ndarray -from torch import FloatTensor, Tensor +from torch import Tensor from so_vits_svc_fork.utils import get_optimal_device @@ -16,8 +16,8 @@ def normalize_f0( - f0: FloatTensor, x_mask: FloatTensor, uv: FloatTensor, random_scale=True -) -> FloatTensor: + f0: Tensor, x_mask: Tensor, uv: Tensor, random_scale: bool = True +) -> Tensor: # calculate means based on x_mask uv_sum = torch.sum(uv, dim=1, keepdim=True) uv_sum[uv_sum == 0] = 9999 diff --git a/src/so_vits_svc_fork/inference/core.py b/src/so_vits_svc_fork/inference/core.py index 314c790b..0c2ddbc4 100644 --- a/src/so_vits_svc_fork/inference/core.py +++ b/src/so_vits_svc_fork/inference/core.py @@ -216,10 +216,10 @@ def infer( audio = self.net_g.infer( c, f0=f0, - g=sid, + spk=sid, uv=uv, predict_f0=auto_predict_f0, - noice_scale=noise_scale, + noise_scale=noise_scale, )[0, 0].data.float() audio_duration = audio.shape[-1] / self.target_sample LOG.info( diff --git a/src/so_vits_svc_fork/modules/attentions.py b/src/so_vits_svc_fork/modules/attentions.py index aeaf40fd..c3985a21 100644 --- a/src/so_vits_svc_fork/modules/attentions.py +++ b/src/so_vits_svc_fork/modules/attentions.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import math import torch -from torch import nn +from torch import Tensor, nn from torch.nn import functional as F from so_vits_svc_fork.modules import commons @@ -11,15 +13,14 @@ class FFT(nn.Module): def __init__( self, - hidden_channels, - filter_channels, - n_heads, - n_layers=1, - kernel_size=1, - p_dropout=0.0, - proximal_bias=False, - proximal_init=True, - **kwargs + hidden_channels: int, + filter_channels: int, + n_heads: int, + n_layers: int = 1, + kernel_size: int = 1, + p_dropout: float = 0.0, + proximal_bias: bool = False, + proximal_init: bool = True, ): super().__init__() self.hidden_channels = hidden_channels @@ -60,7 +61,7 @@ def __init__( ) self.norm_layers_1.append(LayerNorm(hidden_channels)) - def forward(self, x, x_mask): + def forward(self, x: Tensor, x_mask: Tensor) -> Tensor: """ x: decoder input h: encoder output @@ -84,14 +85,13 @@ def forward(self, x, x_mask): class Encoder(nn.Module): def __init__( self, - hidden_channels, - filter_channels, - n_heads, - n_layers, - kernel_size=1, - p_dropout=0.0, - window_size=4, - **kwargs + hidden_channels: int, + filter_channels: int, + n_heads: int, + n_layers: int, + kernel_size: int = 1, + p_dropout: float = 0.0, + window_size: int = 4, ): super().__init__() self.hidden_channels = hidden_channels @@ -129,7 +129,7 @@ def __init__( ) self.norm_layers_2.append(LayerNorm(hidden_channels)) - def forward(self, x, x_mask): + def forward(self, x: Tensor, x_mask: Tensor) -> Tensor: attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1) x = x * x_mask for i in range(self.n_layers): @@ -147,16 +147,15 @@ def forward(self, x, x_mask): class Decoder(nn.Module): def __init__( self, - hidden_channels, - filter_channels, - n_heads, - n_layers, - kernel_size=1, - p_dropout=0.0, - proximal_bias=False, - proximal_init=True, - **kwargs - ): + hidden_channels: int, + filter_channels: int, + n_heads: int, + n_layers: int, + kernel_size: int = 1, + p_dropout: float = 0.0, + proximal_bias: bool = False, + proximal_init: bool = True, + ) -> None: super().__init__() self.hidden_channels = hidden_channels self.filter_channels = filter_channels @@ -204,7 +203,7 @@ def __init__( ) self.norm_layers_2.append(LayerNorm(hidden_channels)) - def forward(self, x, x_mask, h, h_mask): + def forward(self, x: Tensor, x_mask: Tensor, h: Tensor, h_mask: Tensor) -> Tensor: """ x: decoder input h: encoder output @@ -233,15 +232,15 @@ def forward(self, x, x_mask, h, h_mask): class MultiHeadAttention(nn.Module): def __init__( self, - channels, - out_channels, - n_heads, - p_dropout=0.0, - window_size=None, - heads_share=True, - block_length=None, - proximal_bias=False, - proximal_init=False, + channels: int, + out_channels: int, + n_heads: int, + p_dropout: float = 0.0, + window_size: int = None, + heads_share: bool = True, + block_length: int = None, + proximal_bias: bool = False, + proximal_init: bool = False, ): super().__init__() assert channels % n_heads == 0 @@ -284,7 +283,7 @@ def __init__( self.conv_k.weight.copy_(self.conv_q.weight) self.conv_k.bias.copy_(self.conv_q.bias) - def forward(self, x, c, attn_mask=None): + def forward(self, x: Tensor, c: Tensor, attn_mask: Tensor | None = None) -> Tensor: q = self.conv_q(x) k = self.conv_k(c) v = self.conv_v(c) @@ -294,7 +293,9 @@ def forward(self, x, c, attn_mask=None): x = self.conv_o(x) return x - def attention(self, query, key, value, mask=None): + def attention( + self, query: Tensor, key: Tensor, value: Tensor, mask: Tensor | None = None + ) -> Tensor: # reshape [b, d, t] -> [b, n_h, t, d_k] b, d, t_s, t_t = (*key.size(), query.size(2)) query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3) @@ -433,13 +434,13 @@ def _attention_bias_proximal(self, length): class FFN(nn.Module): def __init__( self, - in_channels, - out_channels, - filter_channels, - kernel_size, - p_dropout=0.0, - activation=None, - causal=False, + in_channels: int, + out_channels: int, + filter_channels: int, + kernel_size: int, + p_dropout: float = 0.0, + activation: nn.Module | None = None, + causal: bool = False, ): super().__init__() self.in_channels = in_channels diff --git a/src/so_vits_svc_fork/modules/decoders/bigvgan/__init__.py b/src/so_vits_svc_fork/modules/decoders/bigvgan/__init__.py new file mode 100644 index 00000000..464d78fb --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/bigvgan/__init__.py @@ -0,0 +1,3 @@ +from ._models import BigVGAN + +__all__ = ["BigVGAN"] diff --git a/src/so_vits_svc_fork/modules/decoders/bigvgan/_activations.py b/src/so_vits_svc_fork/modules/decoders/bigvgan/_activations.py new file mode 100644 index 00000000..8e36b466 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/bigvgan/_activations.py @@ -0,0 +1,126 @@ +# Implementation adapted from https://github.com/EdwardDixon/snake under the MIT license. +# LICENSE is in incl_licenses directory. + +import torch +from torch import nn, pow, sin +from torch.nn import Parameter + + +class Snake(nn.Module): + """ + Implementation of a sine-based periodic activation function + Shape: + - Input: (B, C, T) + - Output: (B, C, T), same shape as the input + Parameters: + - alpha - trainable parameter + References: + - This activation function is from this paper by Liu Ziyin, Tilman Hartwig, Masahito Ueda: + https://arxiv.org/abs/2006.08195 + Examples: + >>> a1 = snake(256) + >>> x = torch.randn(256) + >>> x = a1(x) + """ + + def __init__( + self, in_features, alpha=1.0, alpha_trainable=True, alpha_logscale=False + ): + """ + Initialization. + INPUT: + - in_features: shape of the input + - alpha: trainable parameter + alpha is initialized to 1 by default, higher values = higher-frequency. + alpha will be trained along with the rest of your model. + """ + super().__init__() + self.in_features = in_features + + # initialize alpha + self.alpha_logscale = alpha_logscale + if self.alpha_logscale: # log scale alphas initialized to zeros + self.alpha = Parameter(torch.zeros(in_features) * alpha) + else: # linear scale alphas initialized to ones + self.alpha = Parameter(torch.ones(in_features) * alpha) + + self.alpha.requires_grad = alpha_trainable + + self.no_div_by_zero = 0.000000001 + + def forward(self, x): + """ + Forward pass of the function. + Applies the function to the input elementwise. + Snake ∶= x + 1/a * sin^2 (xa) + """ + alpha = self.alpha.unsqueeze(0).unsqueeze(-1) # line up with x to [B, C, T] + if self.alpha_logscale: + alpha = torch.exp(alpha) + x = x + (1.0 / (alpha + self.no_div_by_zero)) * pow(sin(x * alpha), 2) + + return x + + +class SnakeBeta(nn.Module): + """ + A modified Snake function which uses separate parameters for the magnitude of the periodic components + Shape: + - Input: (B, C, T) + - Output: (B, C, T), same shape as the input + Parameters: + - alpha - trainable parameter that controls frequency + - beta - trainable parameter that controls magnitude + References: + - This activation function is a modified version based on this paper by Liu Ziyin, Tilman Hartwig, Masahito Ueda: + https://arxiv.org/abs/2006.08195 + Examples: + >>> a1 = snakebeta(256) + >>> x = torch.randn(256) + >>> x = a1(x) + """ + + def __init__( + self, in_features, alpha=1.0, alpha_trainable=True, alpha_logscale=False + ): + """ + Initialization. + INPUT: + - in_features: shape of the input + - alpha - trainable parameter that controls frequency + - beta - trainable parameter that controls magnitude + alpha is initialized to 1 by default, higher values = higher-frequency. + beta is initialized to 1 by default, higher values = higher-magnitude. + alpha will be trained along with the rest of your model. + """ + super().__init__() + self.in_features = in_features + + # initialize alpha + self.alpha_logscale = alpha_logscale + if self.alpha_logscale: # log scale alphas initialized to zeros + self.alpha = Parameter(torch.zeros(in_features) * alpha) + self.beta = Parameter(torch.zeros(in_features) * alpha) + else: # linear scale alphas initialized to ones + self.alpha = Parameter(torch.ones(in_features) * alpha) + self.beta = Parameter(torch.ones(in_features) * alpha) + + self.alpha.requires_grad = alpha_trainable + self.beta.requires_grad = alpha_trainable + + self.no_div_by_zero = 0.000000001 + + def forward(self, x): + """ + Forward pass of the function. + Applies the function to the input elementwise. + SnakeBeta ∶= x + 1/b * sin^2 (xa) + """ + alpha = self.alpha.unsqueeze(0).unsqueeze(-1) # line up with x to [B, C, T] + beta = self.beta.unsqueeze(0).unsqueeze(-1) + if self.alpha_logscale: + alpha = torch.exp(alpha) + beta = torch.exp(beta) + x = x + (1.0 / (beta + self.no_div_by_zero)) * pow(sin(x * alpha), 2) + + return x diff --git a/src/so_vits_svc_fork/modules/decoders/bigvgan/_models.py b/src/so_vits_svc_fork/modules/decoders/bigvgan/_models.py new file mode 100644 index 00000000..ae555de8 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/bigvgan/_models.py @@ -0,0 +1,319 @@ +# Copyright (c) 2022 NVIDIA CORPORATION. +# Licensed under the MIT license. + +# Adapted from https://github.com/jik876/hifi-gan under the MIT license. +# LICENSE is in incl_licenses directory. + + +import torch +import torch.nn as nn +from alias_free_torch import Activation1d +from torch.nn import Conv1d, ConvTranspose1d +from torch.nn.utils import remove_weight_norm, weight_norm + +from ._activations import Snake, SnakeBeta +from ._utils import get_padding, init_weights + +LRELU_SLOPE = 0.1 + + +class AMPBlock1(torch.nn.Module): + def __init__(self, h, channels, kernel_size=3, dilation=(1, 3, 5), activation=None): + super().__init__() + self.h = h + + self.convs1 = nn.ModuleList( + [ + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[0], + padding=get_padding(kernel_size, dilation[0]), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[1], + padding=get_padding(kernel_size, dilation[1]), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[2], + padding=get_padding(kernel_size, dilation[2]), + ) + ), + ] + ) + self.convs1.apply(init_weights) + + self.convs2 = nn.ModuleList( + [ + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=1, + padding=get_padding(kernel_size, 1), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=1, + padding=get_padding(kernel_size, 1), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=1, + padding=get_padding(kernel_size, 1), + ) + ), + ] + ) + self.convs2.apply(init_weights) + + self.num_layers = len(self.convs1) + len( + self.convs2 + ) # total number of conv layers + + if ( + activation == "snake" + ): # periodic nonlinearity with snake function and anti-aliasing + self.activations = nn.ModuleList( + [ + Activation1d( + activation=Snake(channels, alpha_logscale=h.snake_logscale) + ) + for _ in range(self.num_layers) + ] + ) + elif ( + activation == "snakebeta" + ): # periodic nonlinearity with snakebeta function and anti-aliasing + self.activations = nn.ModuleList( + [ + Activation1d( + activation=SnakeBeta(channels, alpha_logscale=h.snake_logscale) + ) + for _ in range(self.num_layers) + ] + ) + else: + raise NotImplementedError( + "activation incorrectly specified. check the config file and look for 'activation'." + ) + + def forward(self, x): + acts1, acts2 = self.activations[::2], self.activations[1::2] + for c1, c2, a1, a2 in zip(self.convs1, self.convs2, acts1, acts2): + xt = a1(x) + xt = c1(xt) + xt = a2(xt) + xt = c2(xt) + x = xt + x + + return x + + def remove_weight_norm(self): + for l in self.convs1: + remove_weight_norm(l) + for l in self.convs2: + remove_weight_norm(l) + + +class AMPBlock2(torch.nn.Module): + def __init__(self, h, channels, kernel_size=3, dilation=(1, 3), activation=None): + super().__init__() + self.h = h + + self.convs = nn.ModuleList( + [ + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[0], + padding=get_padding(kernel_size, dilation[0]), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[1], + padding=get_padding(kernel_size, dilation[1]), + ) + ), + ] + ) + self.convs.apply(init_weights) + + self.num_layers = len(self.convs) # total number of conv layers + + if ( + activation == "snake" + ): # periodic nonlinearity with snake function and anti-aliasing + self.activations = nn.ModuleList( + [ + Activation1d( + activation=Snake(channels, alpha_logscale=h.snake_logscale) + ) + for _ in range(self.num_layers) + ] + ) + elif ( + activation == "snakebeta" + ): # periodic nonlinearity with snakebeta function and anti-aliasing + self.activations = nn.ModuleList( + [ + Activation1d( + activation=SnakeBeta(channels, alpha_logscale=h.snake_logscale) + ) + for _ in range(self.num_layers) + ] + ) + else: + raise NotImplementedError( + "activation incorrectly specified. check the config file and look for 'activation'." + ) + + def forward(self, x): + for c, a in zip(self.convs, self.activations): + xt = a(x) + xt = c(xt) + x = xt + x + + return x + + def remove_weight_norm(self): + for l in self.convs: + remove_weight_norm(l) + + +class BigVGAN(torch.nn.Module): + # this is our main BigVGAN model. Applies anti-aliased periodic activation for resblocks. + def __init__(self, h): + super().__init__() + self.h = h + + self.num_kernels = len(h.resblock_kernel_sizes) + self.num_upsamples = len(h.upsample_rates) + + # pre conv + self.conv_pre = weight_norm( + Conv1d(h.num_mels, h.upsample_initial_channel, 7, 1, padding=3) + ) + + # define which AMPBlock to use. BigVGAN uses AMPBlock1 as default + resblock = AMPBlock1 if h.resblock == "1" else AMPBlock2 + + # transposed conv-based upsamplers. does not apply anti-aliasing + self.ups = nn.ModuleList() + for i, (u, k) in enumerate(zip(h.upsample_rates, h.upsample_kernel_sizes)): + self.ups.append( + nn.ModuleList( + [ + weight_norm( + ConvTranspose1d( + h.upsample_initial_channel // (2**i), + h.upsample_initial_channel // (2 ** (i + 1)), + k, + u, + padding=(k - u) // 2, + ) + ) + ] + ) + ) + + # residual blocks using anti-aliased multi-periodicity composition modules (AMP) + self.resblocks = nn.ModuleList() + for i in range(len(self.ups)): + ch = h.upsample_initial_channel // (2 ** (i + 1)) + for j, (k, d) in enumerate( + zip(h.resblock_kernel_sizes, h.resblock_dilation_sizes) + ): + self.resblocks.append(resblock(h, ch, k, d, activation=h.activation)) + + # post conv + if ( + h.activation == "snake" + ): # periodic nonlinearity with snake function and anti-aliasing + activation_post = Snake(ch, alpha_logscale=h.snake_logscale) + self.activation_post = Activation1d(activation=activation_post) + elif ( + h.activation == "snakebeta" + ): # periodic nonlinearity with snakebeta function and anti-aliasing + activation_post = SnakeBeta(ch, alpha_logscale=h.snake_logscale) + self.activation_post = Activation1d(activation=activation_post) + else: + raise NotImplementedError( + "activation incorrectly specified. check the config file and look for 'activation'." + ) + + self.conv_post = weight_norm(Conv1d(ch, 1, 7, 1, padding=3)) + + # weight initialization + for i in range(len(self.ups)): + self.ups[i].apply(init_weights) + self.conv_post.apply(init_weights) + + def forward(self, x): + # pre conv + x = self.conv_pre(x) + + for i in range(self.num_upsamples): + # upsampling + for i_up in range(len(self.ups[i])): + x = self.ups[i][i_up](x) + # AMP blocks + xs = None + for j in range(self.num_kernels): + if xs is None: + xs = self.resblocks[i * self.num_kernels + j](x) + else: + xs += self.resblocks[i * self.num_kernels + j](x) + x = xs / self.num_kernels + + # post conv + x = self.activation_post(x) + x = self.conv_post(x) + x = torch.tanh(x) + + return x + + def remove_weight_norm(self): + print("Removing weight norm...") + for l in self.ups: + for l_i in l: + remove_weight_norm(l_i) + for l in self.resblocks: + l.remove_weight_norm() + remove_weight_norm(self.conv_pre) + remove_weight_norm(self.conv_post) diff --git a/src/so_vits_svc_fork/modules/decoders/bigvgan/_utils.py b/src/so_vits_svc_fork/modules/decoders/bigvgan/_utils.py new file mode 100644 index 00000000..874a96bf --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/bigvgan/_utils.py @@ -0,0 +1,12 @@ +# Adapted from https://github.com/jik876/hifi-gan under the MIT license. +# LICENSE is in incl_licenses directory. + + +def init_weights(m, mean=0.0, std=0.01): + classname = m.__class__.__name__ + if classname.find("Conv") != -1: + m.weight.data.normal_(mean, std) + + +def get_padding(kernel_size, dilation=1): + return int((kernel_size * dilation - dilation) / 2) diff --git a/src/so_vits_svc_fork/modules/decoders/f0.py b/src/so_vits_svc_fork/modules/decoders/f0.py index 38d8c77d..f1372a48 100644 --- a/src/so_vits_svc_fork/modules/decoders/f0.py +++ b/src/so_vits_svc_fork/modules/decoders/f0.py @@ -1,20 +1,20 @@ import torch -from torch import nn +from torch import Tensor, nn -from so_vits_svc_fork.modules import attentions as attentions +from ..attentions import FFT class F0Decoder(nn.Module): def __init__( self, - out_channels, - hidden_channels, - filter_channels, - n_heads, - n_layers, - kernel_size, - p_dropout, - spk_channels=0, + out_channels: int, + hidden_channels: int, + filter_channels: int, + n_heads: int, + n_layers: int, + kernel_size: int, + p_dropout: float, + spk_channels: int, ): super().__init__() self.out_channels = out_channels @@ -27,14 +27,20 @@ def __init__( self.spk_channels = spk_channels self.prenet = nn.Conv1d(hidden_channels, hidden_channels, 3, padding=1) - self.decoder = attentions.FFT( + self.decoder = FFT( hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout ) self.proj = nn.Conv1d(hidden_channels, out_channels, 1) self.f0_prenet = nn.Conv1d(1, hidden_channels, 3, padding=1) self.cond = nn.Conv1d(spk_channels, hidden_channels, 1) - def forward(self, x, norm_f0, x_mask, spk_emb=None): + def forward( + self, + x: Tensor, + norm_f0: Tensor, + x_mask: Tensor, + spk_emb: Tensor | None = None, + ) -> Tensor: x = torch.detach(x) if spk_emb is not None: x = x + self.cond(spk_emb) diff --git a/src/so_vits_svc_fork/modules/decoders/pc_ddsp/__init__.py b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/__init__.py new file mode 100644 index 00000000..32b6e494 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/__init__.py @@ -0,0 +1,3 @@ +from ._vocoder import CombSub, CombSubFast, Sins, VolumeExtractor + +__all__ = ["CombSub", "CombSubFast", "Sins", "VolumeExtractor"] diff --git a/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_core.py b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_core.py new file mode 100644 index 00000000..b07ef294 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_core.py @@ -0,0 +1,270 @@ +import numpy as np +import torch +import torch.nn as nn +from torch.nn import functional as F + + +def get_fft_size(frame_size: int, ir_size: int, power_of_2: bool = True): + """Calculate final size for efficient FFT. + Args: + frame_size: Size of the audio frame. + ir_size: Size of the convolving impulse response. + power_of_2: Constrain to be a power of 2. If False, allow other 5-smooth + numbers. TPU requires power of 2, while GPU is more flexible. + Returns: + fft_size: Size for efficient FFT. + """ + convolved_frame_size = ir_size + frame_size - 1 + if power_of_2: + # Next power of 2. + fft_size = int(2 ** np.ceil(np.log2(convolved_frame_size))) + else: + fft_size = convolved_frame_size + return fft_size + + +def upsample(signal, factor): + signal = signal.permute(0, 2, 1) + signal = nn.functional.interpolate( + torch.cat((signal, signal[:, :, -1:]), 2), + size=signal.shape[-1] * factor + 1, + mode="linear", + align_corners=True, + ) + signal = signal[:, :, :-1] + return signal.permute(0, 2, 1) + + +def remove_above_fmax(amplitudes, pitch, fmax, level_start=1): + n_harm = amplitudes.shape[-1] + pitches = pitch * torch.arange(level_start, n_harm + level_start).to(pitch) + aa = (pitches < fmax).float() + 1e-7 + return amplitudes * aa + + +def crop_and_compensate_delay( + audio, audio_size, ir_size, padding="same", delay_compensation=-1 +): + """Crop audio output from convolution to compensate for group delay. + Args: + audio: Audio after convolution. Tensor of shape [batch, time_steps]. + audio_size: Initial size of the audio before convolution. + ir_size: Size of the convolving impulse response. + padding: Either 'valid' or 'same'. For 'same' the final output to be the + same size as the input audio (audio_timesteps). For 'valid' the audio is + extended to include the tail of the impulse response (audio_timesteps + + ir_timesteps - 1). + delay_compensation: Samples to crop from start of output audio to compensate + for group delay of the impulse response. If delay_compensation < 0 it + defaults to automatically calculating a constant group delay of the + windowed linear phase filter from frequency_impulse_response(). + Returns: + Tensor of cropped and shifted audio. + Raises: + ValueError: If padding is not either 'valid' or 'same'. + """ + # Crop the output. + if padding == "valid": + crop_size = ir_size + audio_size - 1 + elif padding == "same": + crop_size = audio_size + else: + raise ValueError( + "Padding must be 'valid' or 'same', instead " "of {}.".format(padding) + ) + + # Compensate for the group delay of the filter by trimming the front. + # For an impulse response produced by frequency_impulse_response(), + # the group delay is constant because the filter is linear phase. + total_size = int(audio.shape[-1]) + crop = total_size - crop_size + start = ir_size // 2 if delay_compensation < 0 else delay_compensation + end = crop - start + return audio[:, start:-end] + + +def fft_convolve(audio, impulse_response): # B, n_frames, 2*(n_mags-1) + """Filter audio with frames of time-varying impulse responses. + Time-varying filter. Given audio [batch, n_samples], and a series of impulse + responses [batch, n_frames, n_impulse_response], splits the audio into frames, + applies filters, and then overlap-and-adds audio back together. + Applies non-windowed non-overlapping STFT/ISTFT to efficiently compute + convolution for large impulse response sizes. + Args: + audio: Input audio. Tensor of shape [batch, audio_timesteps]. + impulse_response: Finite impulse response to convolve. Can either be a 2-D + Tensor of shape [batch, ir_size], or a 3-D Tensor of shape [batch, + ir_frames, ir_size]. A 2-D tensor will apply a single linear + time-invariant filter to the audio. A 3-D Tensor will apply a linear + time-varying filter. Automatically chops the audio into equally shaped + blocks to match ir_frames. + Returns: + audio_out: Convolved audio. Tensor of shape + [batch, audio_timesteps]. + """ + # Add a frame dimension to impulse response if it doesn't have one. + ir_shape = impulse_response.size() + if len(ir_shape) == 2: + impulse_response = impulse_response.unsqueeze(1) + ir_shape = impulse_response.size() + + # Get shapes of audio and impulse response. + batch_size_ir, n_ir_frames, ir_size = ir_shape + batch_size, audio_size = audio.size() # B, T + + # Validate that batch sizes match. + if batch_size != batch_size_ir: + raise ValueError( + "Batch size of audio ({}) and impulse response ({}) must " + "be the same.".format(batch_size, batch_size_ir) + ) + + # Cut audio into 50% overlapped frames (center padding). + hop_size = int(audio_size / n_ir_frames) + frame_size = 2 * hop_size + audio_frames = F.pad(audio, (hop_size, hop_size)).unfold(1, frame_size, hop_size) + + # Apply Bartlett (triangular) window + window = torch.bartlett_window(frame_size).to(audio_frames) + audio_frames = audio_frames * window + + # Pad and FFT the audio and impulse responses. + fft_size = get_fft_size(frame_size, ir_size, power_of_2=False) + audio_fft = torch.fft.rfft(audio_frames, fft_size) + ir_fft = torch.fft.rfft( + torch.cat((impulse_response, impulse_response[:, -1:, :]), 1), fft_size + ) + + # Multiply the FFTs (same as convolution in time). + audio_ir_fft = torch.multiply(audio_fft, ir_fft) + + # Take the IFFT to resynthesize audio. + audio_frames_out = torch.fft.irfft(audio_ir_fft, fft_size) + + # Overlap Add + ( + batch_size, + n_audio_frames, + frame_size, + ) = audio_frames_out.size() # # B, n_frames+1, 2*(hop_size+n_mags-1)-1 + fold = torch.nn.Fold( + output_size=(1, (n_audio_frames - 1) * hop_size + frame_size), + kernel_size=(1, frame_size), + stride=(1, hop_size), + ) + output_signal = fold(audio_frames_out.transpose(1, 2)).squeeze(1).squeeze(1) + + # Crop and shift the output audio. + output_signal = crop_and_compensate_delay( + output_signal[:, hop_size:], audio_size, ir_size + ) + return output_signal + + +def apply_window_to_impulse_response( + impulse_response, # B, n_frames, 2*(n_mag-1) + window_size: int = 0, + causal: bool = False, +): + """Apply a window to an impulse response and put in causal form. + Args: + impulse_response: A series of impulse responses frames to window, of shape + [batch, n_frames, ir_size]. ---------> ir_size means size of filter_bank ?????? + + window_size: Size of the window to apply in the time domain. If window_size + is less than 1, it defaults to the impulse_response size. + causal: Impulse response input is in causal form (peak in the middle). + Returns: + impulse_response: Windowed impulse response in causal form, with last + dimension cropped to window_size if window_size is greater than 0 and less + than ir_size. + """ + + # If IR is in causal form, put it in zero-phase form. + if causal: + impulse_response = torch.fftshift(impulse_response, axes=-1) + + # Get a window for better time/frequency resolution than rectangular. + # Window defaults to IR size, cannot be bigger. + ir_size = int(impulse_response.size(-1)) + if (window_size <= 0) or (window_size > ir_size): + window_size = ir_size + window = nn.Parameter(torch.hann_window(window_size), requires_grad=False).to( + impulse_response + ) + + # Zero pad the window and put in in zero-phase form. + padding = ir_size - window_size + if padding > 0: + half_idx = (window_size + 1) // 2 + window = torch.cat( + [window[half_idx:], torch.zeros([padding]), window[:half_idx]], axis=0 + ) + else: + window = window.roll(window.size(-1) // 2, -1) + + # Apply the window, to get new IR (both in zero-phase form). + window = window.unsqueeze(0) + impulse_response = impulse_response * window + + # Put IR in causal form and trim zero padding. + if padding > 0: + first_half_start = (ir_size - (half_idx - 1)) + 1 + second_half_end = half_idx + 1 + impulse_response = torch.cat( + [ + impulse_response[..., first_half_start:], + impulse_response[..., :second_half_end], + ], + dim=-1, + ) + else: + impulse_response = impulse_response.roll(impulse_response.size(-1) // 2, -1) + + return impulse_response + + +def apply_dynamic_window_to_impulse_response( + impulse_response, half_width_frames # B, n_frames, 2*(n_mag-1) or 2*n_mag-1 +): # B,n_frames, 1 + ir_size = int(impulse_response.size(-1)) # 2*(n_mag -1) or 2*n_mag-1 + + window = ( + torch.arange(-(ir_size // 2), (ir_size + 1) // 2).to(impulse_response) + / half_width_frames + ) + window[window > 1] = 0 + window = ( + 1 + torch.cos(np.pi * window) + ) / 2 # B, n_frames, 2*(n_mag -1) or 2*n_mag-1 + + impulse_response = impulse_response.roll(ir_size // 2, -1) + impulse_response = impulse_response * window + + return impulse_response + + +def frequency_impulse_response(magnitudes, hann_window=True, half_width_frames=None): + # Get the IR + impulse_response = torch.fft.irfft(magnitudes) # B, n_frames, 2*(n_mags-1) + + # Window and put in causal form. + if hann_window: + if half_width_frames is None: + impulse_response = apply_window_to_impulse_response(impulse_response) + else: + impulse_response = apply_dynamic_window_to_impulse_response( + impulse_response, half_width_frames + ) + else: + impulse_response = impulse_response.roll(impulse_response.size(-1) // 2, -1) + + return impulse_response + + +def frequency_filter(audio, magnitudes, hann_window=True, half_width_frames=None): + impulse_response = frequency_impulse_response( + magnitudes, hann_window, half_width_frames + ) + + return fft_convolve(audio, impulse_response) diff --git a/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_loss.py b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_loss.py new file mode 100644 index 00000000..f325af3c --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_loss.py @@ -0,0 +1,63 @@ +import torch +import torch.nn as nn +import torchaudio +from torch.nn import functional as F + + +class SSSLoss(nn.Module): + """ + Single-scale Spectral Loss. + """ + + def __init__(self, n_fft=111, alpha=1.0, overlap=0, eps=1e-7): + super().__init__() + self.n_fft = n_fft + self.alpha = alpha + self.eps = eps + self.hop_length = int(n_fft * (1 - overlap)) # 25% of the length + self.spec = torchaudio.transforms.Spectrogram( + n_fft=self.n_fft, + hop_length=self.hop_length, + power=1, + normalized=True, + center=False, + ) + + def forward(self, x_true, x_pred): + S_true = self.spec(x_true) + self.eps + S_pred = self.spec(x_pred) + self.eps + + converge_term = torch.mean( + torch.linalg.norm(S_true - S_pred, dim=(1, 2)) + / torch.linalg.norm(S_true + S_pred, dim=(1, 2)) + ) + + log_term = F.l1_loss(S_true.log(), S_pred.log()) + + loss = converge_term + self.alpha * log_term + return loss + + +class RSSLoss(nn.Module): + """ + Random-scale Spectral Loss. + """ + + def __init__( + self, fft_min, fft_max, n_scale, alpha=1.0, overlap=0, eps=1e-7, device="cuda" + ): + super().__init__() + self.fft_min = fft_min + self.fft_max = fft_max + self.n_scale = n_scale + self.lossdict = {} + for n_fft in range(fft_min, fft_max): + self.lossdict[n_fft] = SSSLoss(n_fft, alpha, overlap, eps).to(device) + + def forward(self, x_pred, x_true): + value = 0.0 + n_ffts = torch.randint(self.fft_min, self.fft_max, (self.n_scale,)) + for n_fft in n_ffts: + loss_func = self.lossdict[int(n_fft)] + value += loss_func(x_true, x_pred) + return value / self.n_scale diff --git a/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_pcmer.py b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_pcmer.py new file mode 100644 index 00000000..a7ff4704 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_pcmer.py @@ -0,0 +1,484 @@ +import math +from functools import partial + +import torch +import torch.nn.functional as F +from einops import rearrange, repeat +from local_attention import LocalAttention +from torch import nn + +# import fast_transformers.causal_product.causal_product_cuda + + +def softmax_kernel( + data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None +): + b, h, *_ = data.shape + # (batch size, head, length, model_dim) + + # normalize model dim + data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0 + + # what is ration?, projection_matrix.shape[0] --> 266 + + ratio = projection_matrix.shape[0] ** -0.5 + + projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h) + projection = projection.type_as(data) + + # data_dash = w^T x + data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection) + + # diag_data = D**2 + diag_data = data**2 + diag_data = torch.sum(diag_data, dim=-1) + diag_data = (diag_data / 2.0) * (data_normalizer**2) + diag_data = diag_data.unsqueeze(dim=-1) + + # print () + if is_query: + data_dash = ratio * ( + torch.exp( + data_dash + - diag_data + - torch.max(data_dash, dim=-1, keepdim=True).values + ) + + eps + ) + else: + data_dash = ratio * ( + torch.exp(data_dash - diag_data + eps) + ) # - torch.max(data_dash)) + eps) + + return data_dash.type_as(data) + + +def orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None): + unstructured_block = torch.randn((cols, cols), device=device) + q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced") + q, r = map(lambda t: t.to(device), (q, r)) + + # proposed by @Parskatt + # to make sure Q is uniform https://arxiv.org/pdf/math-ph/0609050.pdf + if qr_uniform_q: + d = torch.diag(r, 0) + q *= d.sign() + return q.t() + + +def exists(val): + return val is not None + + +def empty(tensor): + return tensor.numel() == 0 + + +def default(val, d): + return val if exists(val) else d + + +def cast_tuple(val): + return (val,) if not isinstance(val, tuple) else val + + +class PCmer(nn.Module): + """The encoder that is used in the Transformer model.""" + + def __init__( + self, + num_layers, + num_heads, + dim_model, + dim_keys, + dim_values, + residual_dropout, + attention_dropout, + ): + super().__init__() + self.num_layers = num_layers + self.num_heads = num_heads + self.dim_model = dim_model + self.dim_values = dim_values + self.dim_keys = dim_keys + self.residual_dropout = residual_dropout + self.attention_dropout = attention_dropout + + self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)]) + + # METHODS ######################################################################################################## + + def forward(self, phone, mask=None): + # apply all layers to the input + for i, layer in enumerate(self._layers): + phone = layer(phone, mask) + # provide the final sequence + return phone + + +# ==================================================================================================================== # +# CLASS _ E N C O D E R L A Y E R # +# ==================================================================================================================== # + + +class _EncoderLayer(nn.Module): + """One layer of the encoder. + + Attributes: + attn: (:class:`mha.MultiHeadAttention`): The attention mechanism that is used to read the input sequence. + feed_forward (:class:`ffl.FeedForwardLayer`): The feed-forward layer on top of the attention mechanism. + """ + + def __init__(self, parent: PCmer): + """Creates a new instance of ``_EncoderLayer``. + + Args: + parent (Encoder): The encoder that the layers is created for. + """ + super().__init__() + + self.conformer = ConformerConvModule(parent.dim_model) + self.norm = nn.LayerNorm(parent.dim_model) + self.dropout = nn.Dropout(parent.residual_dropout) + + # selfatt -> fastatt: performer! + self.attn = SelfAttention( + dim=parent.dim_model, heads=parent.num_heads, causal=False + ) + + # METHODS ######################################################################################################## + + def forward(self, phone, mask=None): + # compute attention sub-layer + phone = phone + (self.attn(self.norm(phone), mask=mask)) + + phone = phone + (self.conformer(phone)) + + return phone + + +def calc_same_padding(kernel_size): + pad = kernel_size // 2 + return (pad, pad - (kernel_size + 1) % 2) + + +# helper classes + + +class Swish(nn.Module): + def forward(self, x): + return x * x.sigmoid() + + +class Transpose(nn.Module): + def __init__(self, dims): + super().__init__() + assert len(dims) == 2, "dims must be a tuple of two dimensions" + self.dims = dims + + def forward(self, x): + return x.transpose(*self.dims) + + +class GLU(nn.Module): + def __init__(self, dim): + super().__init__() + self.dim = dim + + def forward(self, x): + out, gate = x.chunk(2, dim=self.dim) + return out * gate.sigmoid() + + +class DepthWiseConv1d(nn.Module): + def __init__(self, chan_in, chan_out, kernel_size, padding): + super().__init__() + self.padding = padding + self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in) + + def forward(self, x): + x = F.pad(x, self.padding) + return self.conv(x) + + +class ConformerConvModule(nn.Module): + def __init__( + self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0 + ): + super().__init__() + + inner_dim = dim * expansion_factor + padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0) + + self.net = nn.Sequential( + nn.LayerNorm(dim), + Transpose((1, 2)), + nn.Conv1d(dim, inner_dim * 2, 1), + GLU(dim=1), + DepthWiseConv1d( + inner_dim, inner_dim, kernel_size=kernel_size, padding=padding + ), + # nn.BatchNorm1d(inner_dim) if not causal else nn.Identity(), + Swish(), + nn.Conv1d(inner_dim, dim, 1), + Transpose((1, 2)), + nn.Dropout(dropout), + ) + + def forward(self, x): + return self.net(x) + + +def linear_attention(q, k, v): + if v is None: + # print (k.size(), q.size()) + out = torch.einsum("...ed,...nd->...ne", k, q) + return out + + else: + k_cumsum = k.sum(dim=-2) + # k_cumsum = k.sum(dim = -2) + D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8) + + context = torch.einsum("...nd,...ne->...de", k, v) + # print ("TRUEEE: ", context.size(), q.size(), D_inv.size()) + out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv) + return out + + +def gaussian_orthogonal_random_matrix( + nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None +): + nb_full_blocks = int(nb_rows / nb_columns) + # print (nb_full_blocks) + block_list = [] + + for _ in range(nb_full_blocks): + q = orthogonal_matrix_chunk( + nb_columns, qr_uniform_q=qr_uniform_q, device=device + ) + block_list.append(q) + # block_list[n] is a orthogonal matrix ... (model_dim * model_dim) + # print (block_list[0].size(), torch.einsum('...nd,...nd->...n', block_list[0], torch.roll(block_list[0],1,1))) + # print (nb_rows, nb_full_blocks, nb_columns) + remaining_rows = nb_rows - nb_full_blocks * nb_columns + # print (remaining_rows) + if remaining_rows > 0: + q = orthogonal_matrix_chunk( + nb_columns, qr_uniform_q=qr_uniform_q, device=device + ) + # print (q[:remaining_rows].size()) + block_list.append(q[:remaining_rows]) + + final_matrix = torch.cat(block_list) + + if scaling == 0: + multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1) + elif scaling == 1: + multiplier = math.sqrt(float(nb_columns)) * torch.ones( + (nb_rows,), device=device + ) + else: + raise ValueError(f"Invalid scaling {scaling}") + + return torch.diag(multiplier) @ final_matrix + + +class FastAttention(nn.Module): + def __init__( + self, + dim_heads, + nb_features=None, + ortho_scaling=0, + causal=False, + generalized_attention=False, + kernel_fn=nn.ReLU(), + qr_uniform_q=False, + no_projection=False, + causal_linear_attention=None, + causal_linear_attention_noncuda=None, + ): + super().__init__() + nb_features = default(nb_features, int(dim_heads * math.log(dim_heads))) + + self.dim_heads = dim_heads + self.nb_features = nb_features + self.ortho_scaling = ortho_scaling + + self.create_projection = partial( + gaussian_orthogonal_random_matrix, + nb_rows=self.nb_features, + nb_columns=dim_heads, + scaling=ortho_scaling, + qr_uniform_q=qr_uniform_q, + ) + projection_matrix = self.create_projection() + self.register_buffer("projection_matrix", projection_matrix) + + self.generalized_attention = generalized_attention + self.kernel_fn = kernel_fn + + # if this is turned on, no projection will be used + # queries and keys will be softmax-ed as in the original efficient attention paper + self.no_projection = no_projection + + self.causal = causal + if causal: + try: + self.causal_linear_fn = partial(causal_linear_attention) + except ImportError: + print( + "unable to import cuda code for auto-regressive Performer. will default to the memory inefficient non-cuda version" + ) + self.causal_linear_fn = causal_linear_attention_noncuda + + @torch.no_grad() + def redraw_projection_matrix(self): + projections = self.create_projection() + self.projection_matrix.copy_(projections) + del projections + + def forward(self, q, k, v, generalized_kernel): + device = q.device + + if self.no_projection: + q = q.softmax(dim=-1) + k = torch.exp(k) if self.causal else k.softmax(dim=-2) + + elif self.generalized_attention: + create_kernel = partial( + generalized_kernel, + kernel_fn=self.kernel_fn, + projection_matrix=self.projection_matrix, + device=device, + ) + q, k = map(create_kernel, (q, k)) + + else: + create_kernel = partial( + softmax_kernel, projection_matrix=self.projection_matrix, device=device + ) + + q = create_kernel(q, is_query=True) + k = create_kernel(k, is_query=False) + + attn_fn = linear_attention if not self.causal else self.causal_linear_fn + if v is None: + out = attn_fn(q, k, None) + return out + else: + out = attn_fn(q, k, v) + return out + + +class SelfAttention(nn.Module): + def __init__( + self, + dim, + causal=False, + heads=8, + dim_head=64, + local_heads=0, + local_window_size=256, + nb_features=None, + feature_redraw_interval=1000, + generalized_attention=False, + kernel_fn=nn.ReLU(), + qr_uniform_q=False, + dropout=0.0, + no_projection=False, + ): + super().__init__() + assert dim % heads == 0, "dimension must be divisible by number of heads" + dim_head = default(dim_head, dim // heads) + inner_dim = dim_head * heads + self.fast_attention = FastAttention( + dim_head, + nb_features, + causal=causal, + generalized_attention=generalized_attention, + kernel_fn=kernel_fn, + qr_uniform_q=qr_uniform_q, + no_projection=no_projection, + ) + + self.heads = heads + self.global_heads = heads - local_heads + self.local_attn = ( + LocalAttention( + window_size=local_window_size, + causal=causal, + autopad=True, + dropout=dropout, + look_forward=int(not causal), + rel_pos_emb_config=(dim_head, local_heads), + ) + if local_heads > 0 + else None + ) + + # print (heads, nb_features, dim_head) + # name_embedding = torch.zeros(110, heads, dim_head, dim_head) + # self.name_embedding = nn.Parameter(name_embedding, requires_grad=True) + + self.to_q = nn.Linear(dim, inner_dim) + self.to_k = nn.Linear(dim, inner_dim) + self.to_v = nn.Linear(dim, inner_dim) + self.to_out = nn.Linear(inner_dim, dim) + self.dropout = nn.Dropout(dropout) + + @torch.no_grad() + def redraw_projection_matrix(self): + self.fast_attention.redraw_projection_matrix() + # torch.nn.init.zeros_(self.name_embedding) + # print (torch.sum(self.name_embedding)) + + def forward( + self, + x, + context=None, + mask=None, + context_mask=None, + name=None, + inference=False, + **kwargs, + ): + _, _, _, h, gh = *x.shape, self.heads, self.global_heads + + cross_attend = exists(context) + + context = default(context, x) + context_mask = default(context_mask, mask) if not cross_attend else context_mask + # print (torch.sum(self.name_embedding)) + q, k, v = self.to_q(x), self.to_k(context), self.to_v(context) + + q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v)) + (q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v)) + + attn_outs = [] + # print (name) + # print (self.name_embedding[name].size()) + if not empty(q): + if exists(context_mask): + global_mask = context_mask[:, None, :, None] + v.masked_fill_(~global_mask, 0.0) + if cross_attend: + pass + # print (torch.sum(self.name_embedding)) + # out = self.fast_attention(q,self.name_embedding[name],None) + # print (torch.sum(self.name_embedding[...,-1:])) + else: + out = self.fast_attention(q, k, v) + attn_outs.append(out) + + if not empty(lq): + assert ( + not cross_attend + ), "local attention is not compatible with cross attention" + out = self.local_attn(lq, lk, lv, input_mask=mask) + attn_outs.append(out) + + out = torch.cat(attn_outs, dim=1) + out = rearrange(out, "b h n d -> b n (h d)") + out = self.to_out(out) + return self.dropout(out) diff --git a/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_unit2control.py b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_unit2control.py new file mode 100644 index 00000000..374cff59 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_unit2control.py @@ -0,0 +1,84 @@ +import numpy as np +import torch +import torch.nn as nn +from torch.nn.utils import weight_norm + +from ._pcmer import PCmer + + +def split_to_dict(tensor, tensor_splits): + """Split a tensor into a dictionary of multiple tensors.""" + labels = [] + sizes = [] + + for k, v in tensor_splits.items(): + labels.append(k) + sizes.append(v) + + tensors = torch.split(tensor, sizes, dim=-1) + return dict(zip(labels, tensors)) + + +class Unit2Control(nn.Module): + def __init__(self, input_channel, n_spk, output_splits): + super().__init__() + self.output_splits = output_splits + self.f0_embed = nn.Linear(1, 256) + self.phase_embed = nn.Linear(1, 256) + self.volume_embed = nn.Linear(1, 256) + self.n_spk = n_spk + if n_spk is not None and n_spk > 1: + self.spk_embed = nn.Embedding(n_spk, 256) + + # conv in stack + self.stack = nn.Sequential( + nn.Conv1d(input_channel, 256, 3, 1, 1), + nn.GroupNorm(4, 256), + nn.LeakyReLU(), + nn.Conv1d(256, 256, 3, 1, 1), + ) + + # transformer + self.decoder = PCmer( + num_layers=3, + num_heads=8, + dim_model=256, + dim_keys=256, + dim_values=256, + residual_dropout=0.1, + attention_dropout=0.1, + ) + self.norm = nn.LayerNorm(256) + + # out + self.n_out = sum([v for k, v in output_splits.items()]) + self.dense_out = weight_norm(nn.Linear(256, self.n_out)) + + def forward(self, units, f0, phase, volume, spk_id=None, spk_mix_dict=None): + """ + input: + B x n_frames x n_unit + return: + dict of B x n_frames x feat + """ + + x = self.stack(units.transpose(1, 2)).transpose(1, 2) + x = ( + x + + self.f0_embed((1 + f0 / 700).log()) + + self.phase_embed(phase / np.pi) + + self.volume_embed(volume) + ) + if self.n_spk is not None and self.n_spk > 1: + if spk_mix_dict is not None: + for k, v in spk_mix_dict.items(): + spk_id_torch = torch.LongTensor(np.array([[k]])).to(units.device) + x = x + v * self.spk_embed(spk_id_torch - 1) + else: + x = x + self.spk_embed(spk_id - 1) + x = self.decoder(x) + x = self.norm(x) + e = self.dense_out(x) + controls = split_to_dict(e, self.output_splits) + + return controls diff --git a/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_vocoder.py b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_vocoder.py new file mode 100644 index 00000000..1bfda624 --- /dev/null +++ b/src/so_vits_svc_fork/modules/decoders/pc_ddsp/_vocoder.py @@ -0,0 +1,457 @@ +import numpy as np +import parselmouth +import pyworld as pw +import resampy +import torch +import torch.nn.functional as F +import torchcrepe + +from ._core import frequency_filter, remove_above_fmax, upsample +from ._unit2control import Unit2Control + + +class F0Extractor: + def __init__( + self, f0_extractor, sample_rate=44100, hop_size=512, f0_min=65, f0_max=800 + ): + self.f0_extractor = f0_extractor + self.sample_rate = sample_rate + self.hop_size = hop_size + self.f0_min = f0_min + self.f0_max = f0_max + + def extract( + self, audio, uv_interp=False, device=None, silence_front=0 + ): # audio: 1d numpy array + # extractor start time + n_frames = int(len(audio) // self.hop_size) + 1 + + start_frame = int(silence_front * self.sample_rate / self.hop_size) + real_silence_front = start_frame * self.hop_size / self.sample_rate + audio = audio[int(np.round(real_silence_front * self.sample_rate)) :] + + # extract f0 using parselmouth + if self.f0_extractor == "parselmouth": + f0 = ( + parselmouth.Sound(audio, self.sample_rate) + .to_pitch_ac( + time_step=self.hop_size / self.sample_rate, + voicing_threshold=0.6, + pitch_floor=self.f0_min, + pitch_ceiling=self.f0_max, + ) + .selected_array["frequency"] + ) + pad_size = ( + start_frame + (int(len(audio) // self.hop_size) - len(f0) + 1) // 2 + ) + f0 = np.pad(f0, (pad_size, n_frames - len(f0) - pad_size)) + + # extract f0 using dio + elif self.f0_extractor == "dio": + _f0, t = pw.dio( + audio.astype("double"), + self.sample_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + channels_in_octave=2, + frame_period=(1000 * self.hop_size / self.sample_rate), + ) + f0 = pw.stonemask(audio.astype("double"), _f0, t, self.sample_rate) + f0 = np.pad( + f0.astype("float"), (start_frame, n_frames - len(f0) - start_frame) + ) + + # extract f0 using harvest + elif self.f0_extractor == "harvest": + f0, _ = pw.harvest( + audio.astype("double"), + self.sample_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=(1000 * self.hop_size / self.sample_rate), + ) + f0 = np.pad( + f0.astype("float"), (start_frame, n_frames - len(f0) - start_frame) + ) + + # extract f0 using crepe + elif self.f0_extractor == "crepe": + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + wav16k = resampy.resample(audio, self.sample_rate, 16000) + wav16k_torch = torch.FloatTensor(wav16k).unsqueeze(0).to(device) + + f0, pd = torchcrepe.predict( + wav16k_torch, + 16000, + 80, + self.f0_min, + self.f0_max, + pad=True, + model="full", + batch_size=512, + device=device, + return_periodicity=True, + ) + + pd = torchcrepe.filter.median(pd, 4) + pd = torchcrepe.threshold.Silence(-60.0)(pd, wav16k_torch, 16000, 80) + f0 = torchcrepe.threshold.At(0.05)(f0, pd) + f0 = torchcrepe.filter.mean(f0, 4) + f0 = torch.where(torch.isnan(f0), torch.full_like(f0, 0), f0) + + f0 = f0.squeeze(0).cpu().numpy() + f0 = np.array( + [ + f0[ + int( + min( + int( + np.round( + n * self.hop_size / self.sample_rate / 0.005 + ) + ), + len(f0) - 1, + ) + ) + ] + for n in range(n_frames - start_frame) + ] + ) + f0 = np.pad(f0, (start_frame, 0)) + + else: + raise ValueError(f" [x] Unknown f0 extractor: {self.f0_extractor}") + + # interpolate the unvoiced f0 + if uv_interp: + uv = f0 == 0 + if len(f0[~uv]) > 0: + f0[uv] = np.interp(np.where(uv)[0], np.where(~uv)[0], f0[~uv]) + f0[f0 < self.f0_min] = self.f0_min + return f0 + + +class VolumeExtractor: + def __init__(self, hop_size=512): + self.hop_size = hop_size + + def extract(self, audio): # audio: 1d numpy array + n_frames = int(len(audio) // self.hop_size) + 1 + audio2 = audio**2 + audio2 = np.pad( + audio2, + (int(self.hop_size // 2), int((self.hop_size + 1) // 2)), + mode="reflect", + ) + volume = np.array( + [ + np.mean(audio2[int(n * self.hop_size) : int((n + 1) * self.hop_size)]) + for n in range(n_frames) + ] + ) + volume = np.sqrt(volume) + return volume + + +class Sins(torch.nn.Module): + def __init__( + self, + sampling_rate, + block_size, + n_harmonics, + n_mag_allpass, + n_mag_noise, + n_unit=256, + n_spk=1, + ): + super().__init__() + + # params + self.register_buffer("sampling_rate", torch.tensor(sampling_rate)) + self.register_buffer("block_size", torch.tensor(block_size)) + # Unit2Control + split_map = { + "amplitudes": n_harmonics, + "group_delay": n_mag_allpass, + "noise_magnitude": n_mag_noise, + } + self.unit2ctrl = Unit2Control(n_unit, n_spk, split_map) + + def forward( + self, + units_frames, + f0_frames, + volume_frames, + spk_id=None, + spk_mix_dict=None, + initial_phase=None, + infer=True, + max_upsample_dim=32, + ): + """ + units_frames: B x n_frames x n_unit + f0_frames: B x n_frames x 1 + volume_frames: B x n_frames x 1 + spk_id: B x 1 + """ + # exciter phase + f0 = upsample(f0_frames, self.block_size) + if infer: + x = torch.cumsum(f0.double() / self.sampling_rate, axis=1) + else: + x = torch.cumsum(f0 / self.sampling_rate, axis=1) + if initial_phase is not None: + x += initial_phase.to(x) / 2 / np.pi + x = x - torch.round(x) + x = x.to(f0) + + phase = 2 * np.pi * x + phase_frames = phase[:, :: self.block_size, :] + + # parameter prediction + ctrls = self.unit2ctrl( + units_frames, + f0_frames, + phase_frames, + volume_frames, + spk_id=spk_id, + spk_mix_dict=spk_mix_dict, + ) + + amplitudes_frames = torch.exp(ctrls["amplitudes"]) / 128 + group_delay = np.pi * torch.tanh(ctrls["group_delay"]) + noise_param = torch.exp(ctrls["noise_magnitude"]) / 128 + + # sinusoids exciter signal + amplitudes_frames = remove_above_fmax( + amplitudes_frames, f0_frames, self.sampling_rate / 2, level_start=1 + ) + n_harmonic = amplitudes_frames.shape[-1] + level_harmonic = torch.arange(1, n_harmonic + 1).to(phase) + sinusoids = 0.0 + for n in range((n_harmonic - 1) // max_upsample_dim + 1): + start = n * max_upsample_dim + end = (n + 1) * max_upsample_dim + phases = phase * level_harmonic[start:end] + amplitudes = upsample(amplitudes_frames[:, :, start:end], self.block_size) + sinusoids += (torch.sin(phases) * amplitudes).sum(-1) + + # harmonic part filter (apply group-delay) + harmonic = frequency_filter( + sinusoids, + torch.exp(1.0j * torch.cumsum(group_delay, axis=-1)), + hann_window=False, + ) + + # noise part filter + noise = torch.rand_like(harmonic) * 2 - 1 + noise = frequency_filter( + noise, + torch.complex(noise_param, torch.zeros_like(noise_param)), + hann_window=True, + ) + + signal = harmonic + noise + + return signal, phase, (harmonic, noise) # , (noise_param, noise_param) + + +class CombSubFast(torch.nn.Module): + def __init__(self, sampling_rate, block_size, n_unit=256, n_spk=1): + super().__init__() + + # params + self.register_buffer("sampling_rate", torch.tensor(sampling_rate)) + self.register_buffer("block_size", torch.tensor(block_size)) + self.register_buffer("window", torch.sqrt(torch.hann_window(2 * block_size))) + # Unit2Control + split_map = { + "harmonic_magnitude": block_size + 1, + "harmonic_phase": block_size + 1, + "noise_magnitude": block_size + 1, + } + self.unit2ctrl = Unit2Control(n_unit, n_spk, split_map) + + def forward( + self, + units_frames, + f0_frames, + volume_frames, + spk_id=None, + spk_mix_dict=None, + initial_phase=None, + infer=True, + **kwargs, + ): + """ + units_frames: B x n_frames x n_unit + f0_frames: B x n_frames x 1 + volume_frames: B x n_frames x 1 + spk_id: B x 1 + """ + # exciter phase + f0 = upsample(f0_frames, self.block_size) + if infer: + x = torch.cumsum(f0.double() / self.sampling_rate, axis=1) + else: + x = torch.cumsum(f0 / self.sampling_rate, axis=1) + if initial_phase is not None: + x += initial_phase.to(x) / 2 / np.pi + x = x - torch.round(x) + x = x.to(f0) + + phase_frames = 2 * np.pi * x[:, :: self.block_size, :] + + # parameter prediction + ctrls = self.unit2ctrl( + units_frames, + f0_frames, + phase_frames, + volume_frames, + spk_id=spk_id, + spk_mix_dict=spk_mix_dict, + ) + + src_filter = torch.exp( + ctrls["harmonic_magnitude"] + 1.0j * np.pi * ctrls["harmonic_phase"] + ) + src_filter = torch.cat((src_filter, src_filter[:, -1:, :]), 1) + noise_filter = torch.exp(ctrls["noise_magnitude"]) / 128 + noise_filter = torch.cat((noise_filter, noise_filter[:, -1:, :]), 1) + + # combtooth exciter signal + combtooth = torch.sinc(self.sampling_rate * x / (f0 + 1e-3)) + combtooth = combtooth.squeeze(-1) + combtooth_frames = F.pad(combtooth, (self.block_size, self.block_size)).unfold( + 1, 2 * self.block_size, self.block_size + ) + combtooth_frames = combtooth_frames * self.window + combtooth_fft = torch.fft.rfft(combtooth_frames, 2 * self.block_size) + + # noise exciter signal + noise = torch.rand_like(combtooth) * 2 - 1 + noise_frames = F.pad(noise, (self.block_size, self.block_size)).unfold( + 1, 2 * self.block_size, self.block_size + ) + noise_frames = noise_frames * self.window + noise_fft = torch.fft.rfft(noise_frames, 2 * self.block_size) + + # apply the filters + signal_fft = combtooth_fft * src_filter + noise_fft * noise_filter + + # take the ifft to resynthesize audio. + signal_frames_out = ( + torch.fft.irfft(signal_fft, 2 * self.block_size) * self.window + ) + + # overlap add + fold = torch.nn.Fold( + output_size=(1, (signal_frames_out.size(1) + 1) * self.block_size), + kernel_size=(1, 2 * self.block_size), + stride=(1, self.block_size), + ) + signal = fold(signal_frames_out.transpose(1, 2))[ + :, 0, 0, self.block_size : -self.block_size + ] + + return signal, phase_frames, (signal, signal) + + +class CombSub(torch.nn.Module): + def __init__( + self, + sampling_rate, + block_size, + n_mag_allpass, + n_mag_harmonic, + n_mag_noise, + n_unit=256, + n_spk=1, + ): + super().__init__() + + # params + self.register_buffer("sampling_rate", torch.tensor(sampling_rate)) + self.register_buffer("block_size", torch.tensor(block_size)) + # Unit2Control + split_map = { + "group_delay": n_mag_allpass, + "harmonic_magnitude": n_mag_harmonic, + "noise_magnitude": n_mag_noise, + } + self.unit2ctrl = Unit2Control(n_unit, n_spk, split_map) + + def forward( + self, + units_frames, + f0_frames, + volume_frames, + spk_id=None, + spk_mix_dict=None, + initial_phase=None, + infer=True, + **kwargs, + ): + """ + units_frames: B x n_frames x n_unit + f0_frames: B x n_frames x 1 + volume_frames: B x n_frames x 1 + spk_id: B x 1 + """ + # exciter phase + f0 = upsample(f0_frames, self.block_size) + if infer: + x = torch.cumsum(f0.double() / self.sampling_rate, axis=1) + else: + x = torch.cumsum(f0 / self.sampling_rate, axis=1) + if initial_phase is not None: + x += initial_phase.to(x) / 2 / np.pi + x = x - torch.round(x) + x = x.to(f0) + + phase_frames = 2 * np.pi * x[:, :: self.block_size, :] + + # parameter prediction + ctrls = self.unit2ctrl( + units_frames, + f0_frames, + phase_frames, + volume_frames, + spk_id=spk_id, + spk_mix_dict=spk_mix_dict, + ) + + group_delay = np.pi * torch.tanh(ctrls["group_delay"]) + src_param = torch.exp(ctrls["harmonic_magnitude"]) + noise_param = torch.exp(ctrls["noise_magnitude"]) / 128 + + # combtooth exciter signal + combtooth = torch.sinc(self.sampling_rate * x / (f0 + 1e-3)) + combtooth = combtooth.squeeze(-1) + + # harmonic part filter (using dynamic-windowed LTV-FIR, with group-delay prediction) + harmonic = frequency_filter( + combtooth, + torch.exp(1.0j * torch.cumsum(group_delay, axis=-1)), + hann_window=False, + ) + harmonic = frequency_filter( + harmonic, + torch.complex(src_param, torch.zeros_like(src_param)), + hann_window=True, + half_width_frames=1.5 * self.sampling_rate / (f0_frames + 1e-3), + ) + + # noise part filter (using constant-windowed LTV-FIR, without group-delay) + noise = torch.rand_like(harmonic) * 2 - 1 + noise = frequency_filter( + noise, + torch.complex(noise_param, torch.zeros_like(noise_param)), + hann_window=True, + ) + + signal = harmonic + noise + + return signal, phase_frames, (harmonic, noise) diff --git a/src/so_vits_svc_fork/modules/encoders.py b/src/so_vits_svc_fork/modules/encoders.py index 4894aa5c..211c06bc 100644 --- a/src/so_vits_svc_fork/modules/encoders.py +++ b/src/so_vits_svc_fork/modules/encoders.py @@ -1,18 +1,18 @@ import torch -from torch import nn +from torch import Tensor, nn +import so_vits_svc_fork.modules.flows from so_vits_svc_fork.modules import attentions as attentions from so_vits_svc_fork.modules import commons as commons -from so_vits_svc_fork.modules import modules as modules class SpeakerEncoder(torch.nn.Module): def __init__( self, - mel_n_channels=80, - model_num_layers=3, - model_hidden_size=256, - model_embedding_size=256, + mel_n_channels: int = 80, + model_num_layers: int = 3, + model_hidden_size: int = 256, + model_embedding_size: int = 256, ): super().__init__() self.lstm = nn.LSTM( @@ -21,21 +21,24 @@ def __init__( self.linear = nn.Linear(model_hidden_size, model_embedding_size) self.relu = nn.ReLU() - def forward(self, mels): + def forward(self, mels: Tensor) -> Tensor: self.lstm.flatten_parameters() _, (hidden, _) = self.lstm(mels) embeds_raw = self.relu(self.linear(hidden[-1])) return embeds_raw / torch.norm(embeds_raw, dim=1, keepdim=True) - def compute_partial_slices(self, total_frames, partial_frames, partial_hop): + def compute_partial_slices( + self, total_frames: int, partial_frames: int, partial_hop: int + ) -> list[Tensor]: mel_slices = [] for i in range(0, total_frames - partial_frames, partial_hop): mel_range = torch.arange(i, i + partial_frames) mel_slices.append(mel_range) - return mel_slices - def embed_utterance(self, mel, partial_frames=128, partial_hop=64): + def embed_utterance( + self, mel: Tensor, partial_frames: int = 128, partial_hop: int = 64 + ) -> Tensor: mel_len = mel.size(1) last_mel = mel[:, -partial_frames:] @@ -58,16 +61,16 @@ def embed_utterance(self, mel, partial_frames=128, partial_hop=64): return embed -class Encoder(nn.Module): +class PosteriorEncoder(nn.Module): def __init__( self, - in_channels, - out_channels, - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - gin_channels=0, + in_channels: int, + out_channels: int, + hidden_channels: int, + kernel_size: int, + dilation_rate: int, + n_layers: int, + gin_channels: int = 0, ): super().__init__() self.in_channels = in_channels @@ -79,7 +82,7 @@ def __init__( self.gin_channels = gin_channels self.pre = nn.Conv1d(in_channels, hidden_channels, 1) - self.enc = modules.WN( + self.enc = so_vits_svc_fork.modules.flows.WN( hidden_channels, kernel_size, dilation_rate, @@ -88,8 +91,7 @@ def __init__( ) self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) - def forward(self, x, x_lengths, g=None): - # print(x.shape,x_lengths.shape) + def forward(self, x: Tensor, x_lengths: Tensor, g: Tensor | None = None): x_mask = torch.unsqueeze(commons.sequence_mask(x_lengths, x.size(2)), 1).to( x.dtype ) @@ -104,14 +106,14 @@ def forward(self, x, x_lengths, g=None): class TextEncoder(nn.Module): def __init__( self, - out_channels, - hidden_channels, - kernel_size, - n_layers, - gin_channels=0, - filter_channels=None, - n_heads=None, - p_dropout=None, + out_channels: int, + hidden_channels: int, + kernel_size: int, + n_layers: int, + gin_channels: int, + filter_channels: int, + n_heads: int, + p_dropout: float, ): super().__init__() self.out_channels = out_channels @@ -119,18 +121,17 @@ def __init__( self.kernel_size = kernel_size self.n_layers = n_layers self.gin_channels = gin_channels - self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) - self.f0_emb = nn.Embedding(256, hidden_channels) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + self.f0_emb = nn.Embedding(gin_channels, hidden_channels) self.enc_ = attentions.Encoder( hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout ) - def forward(self, x, x_mask, f0=None, noice_scale=1): + def forward(self, x: Tensor, x_mask: Tensor, f0: Tensor, noise_scale: float = 1): x = x + self.f0_emb(f0).transpose(1, 2) x = self.enc_(x * x_mask, x_mask) stats = self.proj(x) * x_mask m, logs = torch.split(stats, self.out_channels, dim=1) - z = (m + torch.randn_like(m) * torch.exp(logs) * noice_scale) * x_mask - + z = (m + torch.randn_like(m) * torch.exp(logs) * noise_scale) * x_mask return z, m, logs, x_mask diff --git a/src/so_vits_svc_fork/modules/flows.py b/src/so_vits_svc_fork/modules/flows.py index 9abcba21..f05bed47 100644 --- a/src/so_vits_svc_fork/modules/flows.py +++ b/src/so_vits_svc_fork/modules/flows.py @@ -1,6 +1,7 @@ +import torch from torch import nn -from so_vits_svc_fork.modules import modules as modules +from .commons import fused_add_tanh_sigmoid_multiply class ResidualCouplingBlock(nn.Module): @@ -26,7 +27,7 @@ def __init__( self.flows = nn.ModuleList() for i in range(n_flows): self.flows.append( - modules.ResidualCouplingLayer( + ResidualCouplingLayer( channels, hidden_channels, kernel_size, @@ -36,7 +37,7 @@ def __init__( mean_only=True, ) ) - self.flows.append(modules.Flip()) + self.flows.append(Flip()) def forward(self, x, x_mask, g=None, reverse=False): if not reverse: @@ -46,3 +47,158 @@ def forward(self, x, x_mask, g=None, reverse=False): for flow in reversed(self.flows): x = flow(x, x_mask, g=g, reverse=reverse) return x + + +class ResidualCouplingLayer(nn.Module): + def __init__( + self, + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + p_dropout=0, + gin_channels=0, + mean_only=False, + ): + assert channels % 2 == 0, "channels should be divisible by 2" + super().__init__() + self.channels = channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.half_channels = channels // 2 + self.mean_only = mean_only + + self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) + self.enc = WN( + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + p_dropout=p_dropout, + gin_channels=gin_channels, + ) + self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) + self.post.weight.data.zero_() + self.post.bias.data.zero_() + + def forward(self, x, x_mask, g=None, reverse=False): + x0, x1 = torch.split(x, [self.half_channels] * 2, 1) + h = self.pre(x0) * x_mask + h = self.enc(h, x_mask, g=g) + stats = self.post(h) * x_mask + if not self.mean_only: + m, logs = torch.split(stats, [self.half_channels] * 2, 1) + else: + m = stats + logs = torch.zeros_like(m) + + if not reverse: + x1 = m + x1 * torch.exp(logs) * x_mask + x = torch.cat([x0, x1], 1) + logdet = torch.sum(logs, [1, 2]) + return x, logdet + else: + x1 = (x1 - m) * torch.exp(-logs) * x_mask + x = torch.cat([x0, x1], 1) + return x + + +class WN(torch.nn.Module): + def __init__( + self, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=0, + p_dropout=0, + ): + super().__init__() + assert kernel_size % 2 == 1 + self.hidden_channels = hidden_channels + self.kernel_size = (kernel_size,) + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.gin_channels = gin_channels + self.p_dropout = p_dropout + + self.in_layers = torch.nn.ModuleList() + self.res_skip_layers = torch.nn.ModuleList() + self.drop = nn.Dropout(p_dropout) + + if gin_channels != 0: + cond_layer = torch.nn.Conv1d( + gin_channels, 2 * hidden_channels * n_layers, 1 + ) + self.cond_layer = torch.nn.utils.weight_norm(cond_layer, name="weight") + + for i in range(n_layers): + dilation = dilation_rate**i + padding = int((kernel_size * dilation - dilation) / 2) + in_layer = torch.nn.Conv1d( + hidden_channels, + 2 * hidden_channels, + kernel_size, + dilation=dilation, + padding=padding, + ) + in_layer = torch.nn.utils.weight_norm(in_layer, name="weight") + self.in_layers.append(in_layer) + + # last one is not necessary + if i < n_layers - 1: + res_skip_channels = 2 * hidden_channels + else: + res_skip_channels = hidden_channels + + res_skip_layer = torch.nn.Conv1d(hidden_channels, res_skip_channels, 1) + res_skip_layer = torch.nn.utils.weight_norm(res_skip_layer, name="weight") + self.res_skip_layers.append(res_skip_layer) + + def forward(self, x, x_mask, g=None, **kwargs): + output = torch.zeros_like(x) + n_channels_tensor = torch.IntTensor([self.hidden_channels]) + + if g is not None: + g = self.cond_layer(g) + + for i in range(self.n_layers): + x_in = self.in_layers[i](x) + if g is not None: + cond_offset = i * 2 * self.hidden_channels + g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :] + else: + g_l = torch.zeros_like(x_in) + + acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor) + acts = self.drop(acts) + + res_skip_acts = self.res_skip_layers[i](acts) + if i < self.n_layers - 1: + res_acts = res_skip_acts[:, : self.hidden_channels, :] + x = (x + res_acts) * x_mask + output = output + res_skip_acts[:, self.hidden_channels :, :] + else: + output = output + res_skip_acts + return output * x_mask + + def remove_weight_norm(self): + if self.gin_channels != 0: + torch.nn.utils.remove_weight_norm(self.cond_layer) + for l in self.in_layers: + torch.nn.utils.remove_weight_norm(l) + for l in self.res_skip_layers: + torch.nn.utils.remove_weight_norm(l) + + +class Flip(nn.Module): + def forward(self, x, *args, reverse=False, **kwargs): + x = torch.flip(x, [1]) + if not reverse: + logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) + return x, logdet + else: + return x diff --git a/src/so_vits_svc_fork/modules/modules.py b/src/so_vits_svc_fork/modules/modules.py index 659d4dfe..941cab90 100644 --- a/src/so_vits_svc_fork/modules/modules.py +++ b/src/so_vits_svc_fork/modules/modules.py @@ -4,7 +4,6 @@ from torch.nn import functional as F from torch.nn.utils import remove_weight_norm, weight_norm -from so_vits_svc_fork.modules import commons from so_vits_svc_fork.modules.commons import get_padding, init_weights LRELU_SLOPE = 0.1 @@ -126,94 +125,6 @@ def forward(self, x, x_mask, g=None): return x * x_mask -class WN(torch.nn.Module): - def __init__( - self, - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - gin_channels=0, - p_dropout=0, - ): - super().__init__() - assert kernel_size % 2 == 1 - self.hidden_channels = hidden_channels - self.kernel_size = (kernel_size,) - self.dilation_rate = dilation_rate - self.n_layers = n_layers - self.gin_channels = gin_channels - self.p_dropout = p_dropout - - self.in_layers = torch.nn.ModuleList() - self.res_skip_layers = torch.nn.ModuleList() - self.drop = nn.Dropout(p_dropout) - - if gin_channels != 0: - cond_layer = torch.nn.Conv1d( - gin_channels, 2 * hidden_channels * n_layers, 1 - ) - self.cond_layer = torch.nn.utils.weight_norm(cond_layer, name="weight") - - for i in range(n_layers): - dilation = dilation_rate**i - padding = int((kernel_size * dilation - dilation) / 2) - in_layer = torch.nn.Conv1d( - hidden_channels, - 2 * hidden_channels, - kernel_size, - dilation=dilation, - padding=padding, - ) - in_layer = torch.nn.utils.weight_norm(in_layer, name="weight") - self.in_layers.append(in_layer) - - # last one is not necessary - if i < n_layers - 1: - res_skip_channels = 2 * hidden_channels - else: - res_skip_channels = hidden_channels - - res_skip_layer = torch.nn.Conv1d(hidden_channels, res_skip_channels, 1) - res_skip_layer = torch.nn.utils.weight_norm(res_skip_layer, name="weight") - self.res_skip_layers.append(res_skip_layer) - - def forward(self, x, x_mask, g=None, **kwargs): - output = torch.zeros_like(x) - n_channels_tensor = torch.IntTensor([self.hidden_channels]) - - if g is not None: - g = self.cond_layer(g) - - for i in range(self.n_layers): - x_in = self.in_layers[i](x) - if g is not None: - cond_offset = i * 2 * self.hidden_channels - g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :] - else: - g_l = torch.zeros_like(x_in) - - acts = commons.fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor) - acts = self.drop(acts) - - res_skip_acts = self.res_skip_layers[i](acts) - if i < self.n_layers - 1: - res_acts = res_skip_acts[:, : self.hidden_channels, :] - x = (x + res_acts) * x_mask - output = output + res_skip_acts[:, self.hidden_channels :, :] - else: - output = output + res_skip_acts - return output * x_mask - - def remove_weight_norm(self): - if self.gin_channels != 0: - torch.nn.utils.remove_weight_norm(self.cond_layer) - for l in self.in_layers: - torch.nn.utils.remove_weight_norm(l) - for l in self.res_skip_layers: - torch.nn.utils.remove_weight_norm(l) - - class ResBlock1(torch.nn.Module): def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): super().__init__() @@ -367,16 +278,6 @@ def forward(self, x, x_mask, reverse=False, **kwargs): return x -class Flip(nn.Module): - def forward(self, x, *args, reverse=False, **kwargs): - x = torch.flip(x, [1]) - if not reverse: - logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) - return x, logdet - else: - return x - - class ElementwiseAffine(nn.Module): def __init__(self, channels): super().__init__() @@ -393,60 +294,3 @@ def forward(self, x, x_mask, reverse=False, **kwargs): else: x = (x - self.m) * torch.exp(-self.logs) * x_mask return x - - -class ResidualCouplingLayer(nn.Module): - def __init__( - self, - channels, - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - p_dropout=0, - gin_channels=0, - mean_only=False, - ): - assert channels % 2 == 0, "channels should be divisible by 2" - super().__init__() - self.channels = channels - self.hidden_channels = hidden_channels - self.kernel_size = kernel_size - self.dilation_rate = dilation_rate - self.n_layers = n_layers - self.half_channels = channels // 2 - self.mean_only = mean_only - - self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) - self.enc = WN( - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - p_dropout=p_dropout, - gin_channels=gin_channels, - ) - self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) - self.post.weight.data.zero_() - self.post.bias.data.zero_() - - def forward(self, x, x_mask, g=None, reverse=False): - x0, x1 = torch.split(x, [self.half_channels] * 2, 1) - h = self.pre(x0) * x_mask - h = self.enc(h, x_mask, g=g) - stats = self.post(h) * x_mask - if not self.mean_only: - m, logs = torch.split(stats, [self.half_channels] * 2, 1) - else: - m = stats - logs = torch.zeros_like(m) - - if not reverse: - x1 = m + x1 * torch.exp(logs) * x_mask - x = torch.cat([x0, x1], 1) - logdet = torch.sum(logs, [1, 2]) - return x, logdet - else: - x1 = (x1 - m) * torch.exp(-logs) * x_mask - x = torch.cat([x0, x1], 1) - return x diff --git a/src/so_vits_svc_fork/modules/synthesizers.py b/src/so_vits_svc_fork/modules/synthesizers.py index c96e021b..10772a43 100644 --- a/src/so_vits_svc_fork/modules/synthesizers.py +++ b/src/so_vits_svc_fork/modules/synthesizers.py @@ -3,7 +3,7 @@ from typing import Any, Literal, Sequence import torch -from torch import nn +from torch import Tensor, nn import so_vits_svc_fork.f0 from so_vits_svc_fork.f0 import f0_to_coarse @@ -15,9 +15,11 @@ Multistream_iSTFT_Generator, iSTFT_Generator, ) -from so_vits_svc_fork.modules.encoders import Encoder, TextEncoder +from so_vits_svc_fork.modules.encoders import PosteriorEncoder, TextEncoder from so_vits_svc_fork.modules.flows import ResidualCouplingBlock +from ..hparams import HParams + LOG = getLogger(__name__) @@ -47,10 +49,28 @@ def __init__( ssl_dim: int, n_speakers: int, sampling_rate: int = 44100, - type_: Literal["hifi-gan", "istft", "ms-istft", "mb-istft"] = "hifi-gan", + type_: Literal[ + "hifi-gan", + "istft", + "ms-istft", + "mb-istft", + "ddsp-sins", + "ddsp-combsub", + "ddsp-combsubfast", + ] = "hifi-gan", gen_istft_n_fft: int = 16, gen_istft_hop_size: int = 4, subbands: int = 4, + encoder_n_layers: int = 16, + flow_n_layers: int = 4, + n_flows: int = 4, + flow_kernel_size: int = 3, + block_size: int = 512, + n_harmonics: int = 128, + n_mag_allpass: int = 256, + n_mag_harmonic: int = 512, + n_mag_noise: int = 256, + bigvgan_h: HParams | None = None, **kwargs: Any, ): super().__init__() @@ -76,7 +96,12 @@ def __init__( self.type_ = type_ self.gen_istft_n_fft = gen_istft_n_fft self.gen_istft_hop_size = gen_istft_hop_size + self.n_layers_encoder = encoder_n_layers + self.n_layers_flow = flow_n_layers + self.n_flows = n_flows + self.flow_kernel_size = flow_kernel_size self.subbands = subbands + self.type_ = type_ if kwargs: warnings.warn(f"Unused arguments: {kwargs}") @@ -90,6 +115,7 @@ def __init__( self.enc_p = TextEncoder( inter_channels, hidden_channels, + gin_channels=256, filter_channels=filter_channels, n_heads=n_heads, n_layers=n_layers, @@ -111,8 +137,8 @@ def __init__( "gin_channels": gin_channels, } self.dec = NSFHifiGANGenerator(h=hps) - self.mb = False - else: + self._return_mb = False + elif "istft" in type_: hps = { "initial_channel": inter_channels, "resblock": resblock, @@ -135,64 +161,127 @@ def __init__( self.dec = Multistream_iSTFT_Generator(**hps) elif type_ == "mb-istft": self.dec = Multiband_iSTFT_Generator(**hps) - else: - raise ValueError(f"Unknown type: {type_}") - self.mb = True + elif type_ in ["ddsp-sins", "ddsp-combsub", "ddsp-combsubfast"]: + from .decoders.pc_ddsp import CombSub, CombSubFast, Sins - self.enc_q = Encoder( - spec_channels, - inter_channels, - hidden_channels, - 5, - 1, - 16, + if type_ == "ddsp-sins": + self.dec = Sins( + sampling_rate=sampling_rate, + block_size=block_size, + n_harmonics=n_harmonics, + n_mag_allpass=n_mag_allpass, + n_mag_noise=n_mag_noise, + n_unit=inter_channels, + n_spk=n_speakers, + ) + elif type_ == "ddsp-combsub": + self.dec = CombSub( + sampling_rate=sampling_rate, + block_size=block_size, + n_mag_allpass=n_mag_allpass, + n_mag_harmonic=n_mag_harmonic, + n_mag_noise=n_mag_noise, + n_unit=inter_channels, + n_spk=n_speakers, + ) + elif type_ == "ddsp-combsubfast": + self.dec = CombSubFast( + sampling_rate=sampling_rate, + block_size=block_size, + n_unit=inter_channels, + n_spk=n_speakers, + ) + elif type_ == "bigvgan": + from .decoders.bigvgan import BigVGAN + + self.dec = BigVGAN(bigvgan_h) + else: + raise ValueError(f"Unknown type: {type_}") + + self.enc_q = PosteriorEncoder( + in_channels=spec_channels, + out_channels=inter_channels, + hidden_channels=hidden_channels, + kernel_size=flow_kernel_size, + dilation_rate=1, + n_layers=encoder_n_layers, gin_channels=gin_channels, ) self.flow = ResidualCouplingBlock( - inter_channels, hidden_channels, 5, 1, 4, gin_channels=gin_channels + channels=inter_channels, + hidden_channels=hidden_channels, + kernel_size=flow_kernel_size, + dilation_rate=1, + n_layers=flow_n_layers, + n_flows=n_flows, + gin_channels=gin_channels, ) self.f0_decoder = F0Decoder( - 1, - hidden_channels, - filter_channels, - n_heads, - n_layers, - kernel_size, - p_dropout, + out_channels=1, + hidden_channels=hidden_channels, + filter_channels=filter_channels, + n_heads=n_heads, + n_layers=n_layers, + kernel_size=kernel_size, + p_dropout=p_dropout, spk_channels=gin_channels, ) self.emb_uv = nn.Embedding(2, hidden_channels) - def forward(self, c, f0, uv, spec, g=None, c_lengths=None, spec_lengths=None): - g = self.emb_g(g).transpose(1, 2) + def forward( + self, + c: Tensor, + f0: Tensor, + uv: Tensor, + spec: Tensor, + spk: Tensor | None = None, + c_lengths: Tensor | None = None, + spec_lengths: Tensor | None = None, + volume: Tensor | None = None, + ): + # speaker embedding + g = self.emb_g(spk).transpose(1, 2) + # ssl prenet x_mask = torch.unsqueeze(commons.sequence_mask(c_lengths, c.size(2)), 1).to( c.dtype ) x = self.pre(c) * x_mask + self.emb_uv(uv.long()).transpose(1, 2) - # f0 predict + # f0 decoder lf0 = 2595.0 * torch.log10(1.0 + f0.unsqueeze(1) / 700.0) / 500 norm_lf0 = so_vits_svc_fork.f0.normalize_f0(lf0, x_mask, uv) pred_lf0 = self.f0_decoder(x, norm_lf0, x_mask, spk_emb=g) - # encoder - z_ptemp, m_p, logs_p, _ = self.enc_p(x, x_mask, f0=f0_to_coarse(f0)) + # posterior encoder + _, m_p, logs_p, _ = self.enc_p(x, x_mask, f0=f0_to_coarse(f0)) + + # spectrogram encoder z, m_q, logs_q, spec_mask = self.enc_q(spec, spec_lengths, g=g) # flow z_p = self.flow(z, spec_mask, g=g) + + # slice z, pitch with segment_size to decrease memory usage z_slice, pitch_slice, ids_slice = commons.rand_slice_segments_with_pitch( z, f0, spec_lengths, self.segment_size ) - # MB-iSTFT-VITS - if self.mb: + # decoder + o_mb = None + if "istft" in self.type_: o, o_mb = self.dec(z_slice, g=g) - # HiFi-GAN + elif "ddsp" in self.type_: + o, _, (s_h, s_n) = self.dec( + z_slice.transpose(1, 2), + pitch_slice.unsqueeze(-1), + volume.transpose(0, 1), + spk.long(), + ) + elif "bigvgan" in self.type_: + o = self.dec(z_slice) else: o = self.dec(z_slice, g=g, f0=pitch_slice) - o_mb = None return ( o, o_mb, @@ -204,30 +293,57 @@ def forward(self, c, f0, uv, spec, g=None, c_lengths=None, spec_lengths=None): lf0, ) - def infer(self, c, f0, uv, g=None, noice_scale=0.35, predict_f0=False): + def infer( + self, + c: Tensor, + f0: Tensor, + uv: Tensor, + spk: Tensor, + noise_scale: float = 0.35, + predict_f0: bool = False, + volume: Tensor | None = None, + ) -> Tensor: c_lengths = (torch.ones(c.size(0)) * c.size(-1)).to(c.device) - g = self.emb_g(g).transpose(1, 2) + + # speaker embedding + spk = self.emb_g(spk).transpose(1, 2) + + # ssl prenet x_mask = torch.unsqueeze(commons.sequence_mask(c_lengths, c.size(2)), 1).to( c.dtype ) x = self.pre(c) * x_mask + self.emb_uv(uv.long()).transpose(1, 2) + # f0 decoder if predict_f0: lf0 = 2595.0 * torch.log10(1.0 + f0.unsqueeze(1) / 700.0) / 500 norm_lf0 = so_vits_svc_fork.f0.normalize_f0( lf0, x_mask, uv, random_scale=False ) - pred_lf0 = self.f0_decoder(x, norm_lf0, x_mask, spk_emb=g) + pred_lf0 = self.f0_decoder(x, norm_lf0, x_mask, spk_emb=spk) f0 = (700 * (torch.pow(10, pred_lf0 * 500 / 2595) - 1)).squeeze(1) - z_p, m_p, logs_p, c_mask = self.enc_p( - x, x_mask, f0=f0_to_coarse(f0), noice_scale=noice_scale + # posterior encoder + z_p, _, _, c_mask = self.enc_p( + x, x_mask, f0=f0_to_coarse(f0), noise_scale=noise_scale ) - z = self.flow(z_p, c_mask, g=g, reverse=True) - # MB-iSTFT-VITS - if self.mb: - o, o_mb = self.dec(z * c_mask, g=g) + # flow (reverse) + z = self.flow(z_p, c_mask, g=spk, reverse=True) + + # decoder + if "istft" in self.type_: + o, _ = self.dec(z * c_mask, g=spk) + elif "ddsp" in self.type_: + assert volume is not None + o, _, _ = self.dec( + (z * c_mask).transpose(1, 2), + f0.unsqueeze(-1), + volume.transpose(0, 1), + spk.long(), + ) + elif "bigvgan" in self.type_: + o = self.dec(z) else: - o = self.dec(z * c_mask, g=g, f0=f0) + o = self.dec(z * c_mask, g=spk, f0=f0) return o diff --git a/src/so_vits_svc_fork/preprocessing/preprocess_hubert_f0.py b/src/so_vits_svc_fork/preprocessing/preprocess_hubert_f0.py index 4951922f..114f7efd 100644 --- a/src/so_vits_svc_fork/preprocessing/preprocess_hubert_f0.py +++ b/src/so_vits_svc_fork/preprocessing/preprocess_hubert_f0.py @@ -53,6 +53,12 @@ def _process_one( f0 = torch.from_numpy(f0).float() uv = torch.from_numpy(uv).float() + # compute volume + if "ddsp" in hps.model.get("type_"): + from ..modules.decoders.pc_ddsp import VolumeExtractor + + volume = VolumeExtractor().extract(audio) + # Compute HuBERT content audio = torch.from_numpy(audio).float().to(device) c = utils.get_content( @@ -80,6 +86,9 @@ def _process_one( uv[:lmin], c[:, :lmin], ) + if "ddsp" in hps.model.get("type_"): + volume = torch.from_numpy(volume).float() + volume = volume[:lmin] # get speaker id spk_name = filepath.parent.name @@ -97,6 +106,8 @@ def _process_one( "audio": audio, "spk": spk, } + if "ddsp" in hps.model.get("type_"): + data["volume"] = volume data = {k: v.cpu() for k, v in data.items()} with data_path.open("wb") as f: torch.save(data, f) diff --git a/src/so_vits_svc_fork/train.py b/src/so_vits_svc_fork/train.py index 736c2031..e5c0a40d 100644 --- a/src/so_vits_svc_fork/train.py +++ b/src/so_vits_svc_fork/train.py @@ -93,6 +93,9 @@ def train( class VitsLightning(pl.LightningModule): + net_g: SynthesizerTrn + net_d: MultiPeriodDiscriminator + def __init__(self, reset_optimizer: bool = False, **hparams: Any): super().__init__() self._temp_epoch = 0 # Add this line to initialize the _temp_epoch attribute @@ -312,7 +315,15 @@ def training_step(self, batch: dict[str, torch.Tensor], batch_idx: int) -> None: # Generator # train self.toggle_optimizer(optim_g) - c, f0, spec, mel, y, g, lengths, uv = batch + c = batch["content"] + f0 = batch["f0"] + spec = batch["spec"] + mel = batch["mel_spec"] + y = batch["audio"] + spk = batch["spk"] + lengths = batch["length"] + uv = batch["uv"] + volume = batch.get("volume", None) ( y_hat, y_hat_mb, @@ -322,7 +333,16 @@ def training_step(self, batch: dict[str, torch.Tensor], batch_idx: int) -> None: pred_lf0, norm_lf0, lf0, - ) = self.net_g(c, f0, uv, spec, g=g, c_lengths=lengths, spec_lengths=lengths) + ) = self.net_g( + c, + f0, + uv, + spec, + spk=spk, + c_lengths=lengths, + spec_lengths=lengths, + volume=volume, + ) y_mel = commons.slice_segments( mel, ids_slice, @@ -439,8 +459,14 @@ def training_step(self, batch: dict[str, torch.Tensor], batch_idx: int) -> None: def validation_step(self, batch, batch_idx): with torch.no_grad(): self.net_g.eval() - c, f0, _, mel, y, g, _, uv = batch - y_hat = self.net_g.infer(c, f0, uv, g=g) + c = batch["content"] + f0 = batch["f0"] + mel = batch["mel_spec"] + y = batch["audio"] + uv = batch["uv"] + spk = batch["spk"] + volume = batch.get("volume", None) + y_hat = self.net_g.infer(c, f0, uv, spk=spk, volume=volume) y_hat_mel = mel_spectrogram_torch(y_hat.squeeze(1).float(), self.hparams) self.log_audio_dict( {f"gen/audio_{batch_idx}": y_hat[0], f"gt/audio_{batch_idx}": y[0]} diff --git a/src/so_vits_svc_fork/utils.py b/src/so_vits_svc_fork/utils.py index 441bec41..b4060e4e 100644 --- a/src/so_vits_svc_fork/utils.py +++ b/src/so_vits_svc_fork/utils.py @@ -188,7 +188,7 @@ def _substitute_if_same_shape(to_: dict[str, Any], from_: dict[str, Any]) -> Non shape_missmatch = [] for k, v in from_.items(): if k not in to_: - warnings.warn(f"Key {k} not found in model state dict") + pass elif hasattr(v, "shape"): if not hasattr(to_[k], "shape"): raise ValueError(f"Key {k} is not a tensor")