Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
prophesier
GitHub Repository: prophesier/diff-svc
Path: blob/main/preprocessing/data_gen_utils.py
694 views
1
from io import BytesIO
2
import json
3
import os
4
import re
5
import struct
6
import warnings
7
from collections import OrderedDict
8
9
import librosa
10
import numpy as np
11
import parselmouth
12
import pyloudnorm as pyln
13
import resampy
14
import torch
15
import torchcrepe
16
import webrtcvad
17
from scipy.ndimage.morphology import binary_dilation
18
from skimage.transform import resize
19
20
from utils import audio
21
from utils.pitch_utils import f0_to_coarse
22
from utils.text_encoder import TokenTextEncoder
23
24
warnings.filterwarnings("ignore")
25
PUNCS = '!,.?;:'
26
27
int16_max = (2 ** 15) - 1
28
29
30
def trim_long_silences(path, sr=None, return_raw_wav=False, norm=True, vad_max_silence_length=12):
31
"""
32
Ensures that segments without voice in the waveform remain no longer than a
33
threshold determined by the VAD parameters in params.py.
34
:param wav: the raw waveform as a numpy array of floats
35
:param vad_max_silence_length: Maximum number of consecutive silent frames a segment can have.
36
:return: the same waveform with silences trimmed away (length <= original wav length)
37
"""
38
39
## Voice Activation Detection
40
# Window size of the VAD. Must be either 10, 20 or 30 milliseconds.
41
# This sets the granularity of the VAD. Should not need to be changed.
42
sampling_rate = 16000
43
wav_raw, sr = librosa.core.load(path, sr=sr)
44
45
if norm:
46
meter = pyln.Meter(sr) # create BS.1770 meter
47
loudness = meter.integrated_loudness(wav_raw)
48
wav_raw = pyln.normalize.loudness(wav_raw, loudness, -20.0)
49
if np.abs(wav_raw).max() > 1.0:
50
wav_raw = wav_raw / np.abs(wav_raw).max()
51
52
wav = librosa.resample(wav_raw, sr, sampling_rate, res_type='kaiser_best')
53
54
vad_window_length = 30 # In milliseconds
55
# Number of frames to average together when performing the moving average smoothing.
56
# The larger this value, the larger the VAD variations must be to not get smoothed out.
57
vad_moving_average_width = 8
58
59
# Compute the voice detection window size
60
samples_per_window = (vad_window_length * sampling_rate) // 1000
61
62
# Trim the end of the audio to have a multiple of the window size
63
wav = wav[:len(wav) - (len(wav) % samples_per_window)]
64
65
# Convert the float waveform to 16-bit mono PCM
66
pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16))
67
68
# Perform voice activation detection
69
voice_flags = []
70
vad = webrtcvad.Vad(mode=3)
71
for window_start in range(0, len(wav), samples_per_window):
72
window_end = window_start + samples_per_window
73
voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2],
74
sample_rate=sampling_rate))
75
voice_flags = np.array(voice_flags)
76
77
# Smooth the voice detection with a moving average
78
def moving_average(array, width):
79
array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2)))
80
ret = np.cumsum(array_padded, dtype=float)
81
ret[width:] = ret[width:] - ret[:-width]
82
return ret[width - 1:] / width
83
84
audio_mask = moving_average(voice_flags, vad_moving_average_width)
85
audio_mask = np.round(audio_mask).astype(np.bool)
86
87
# Dilate the voiced regions
88
audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1))
89
audio_mask = np.repeat(audio_mask, samples_per_window)
90
audio_mask = resize(audio_mask, (len(wav_raw),)) > 0
91
if return_raw_wav:
92
return wav_raw, audio_mask, sr
93
return wav_raw[audio_mask], audio_mask, sr
94
95
96
def process_utterance(wav_path,
97
fft_size=1024,
98
hop_size=256,
99
win_length=1024,
100
window="hann",
101
num_mels=80,
102
fmin=80,
103
fmax=7600,
104
eps=1e-6,
105
sample_rate=22050,
106
loud_norm=False,
107
min_level_db=-100,
108
return_linear=False,
109
trim_long_sil=False, vocoder='pwg'):
110
if isinstance(wav_path, str) or isinstance(wav_path, BytesIO):
111
if trim_long_sil:
112
wav, _, _ = trim_long_silences(wav_path, sample_rate)
113
else:
114
wav, _ = librosa.core.load(wav_path, sr=sample_rate)
115
else:
116
wav = wav_path
117
if loud_norm:
118
meter = pyln.Meter(sample_rate) # create BS.1770 meter
119
loudness = meter.integrated_loudness(wav)
120
wav = pyln.normalize.loudness(wav, loudness, -22.0)
121
if np.abs(wav).max() > 1:
122
wav = wav / np.abs(wav).max()
123
124
# get amplitude spectrogram
125
x_stft = librosa.stft(wav, n_fft=fft_size, hop_length=hop_size,
126
win_length=win_length, window=window, pad_mode="constant")
127
spc = np.abs(x_stft) # (n_bins, T)
128
129
# get mel basis
130
fmin = 0 if fmin == -1 else fmin
131
fmax = sample_rate / 2 if fmax == -1 else fmax
132
mel_basis = librosa.filters.mel(sample_rate, fft_size, num_mels, fmin, fmax)
133
mel = mel_basis @ spc
134
135
if vocoder == 'pwg':
136
mel = np.log10(np.maximum(eps, mel)) # (n_mel_bins, T)
137
else:
138
assert False, f'"{vocoder}" is not in ["pwg"].'
139
140
l_pad, r_pad = audio.librosa_pad_lr(wav, fft_size, hop_size, 1)
141
wav = np.pad(wav, (l_pad, r_pad), mode='constant', constant_values=0.0)
142
wav = wav[:mel.shape[1] * hop_size]
143
144
if not return_linear:
145
return wav, mel
146
else:
147
spc = audio.amp_to_db(spc)
148
spc = audio.normalize(spc, {'min_level_db': min_level_db})
149
return wav, mel, spc
150
151
152
def get_pitch_parselmouth(wav_data, mel, hparams):
153
"""
154
155
:param wav_data: [T]
156
:param mel: [T, 80]
157
:param hparams:
158
:return:
159
"""
160
time_step = hparams['hop_size'] / hparams['audio_sample_rate']
161
f0_min = hparams['f0_min']
162
f0_max = hparams['f0_max']
163
164
# if hparams['hop_size'] == 128:
165
# pad_size = 4
166
# elif hparams['hop_size'] == 256:
167
# pad_size = 2
168
# else:
169
# assert False
170
171
f0 = parselmouth.Sound(wav_data, hparams['audio_sample_rate']).to_pitch_ac(
172
time_step=time_step, voicing_threshold=0.6,
173
pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency']
174
# lpad = pad_size * 2
175
# rpad = len(mel) - len(f0) - lpad
176
# f0 = np.pad(f0, [[lpad, rpad]], mode='constant')
177
# # mel and f0 are extracted by 2 different libraries. we should force them to have the same length.
178
# # Attention: we find that new version of some libraries could cause ``rpad'' to be a negetive value...
179
# # Just to be sure, we recommend users to set up the same environments as them in requirements_auto.txt (by Anaconda)
180
# delta_l = len(mel) - len(f0)
181
# assert np.abs(delta_l) <= 8
182
# if delta_l > 0:
183
# f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0)
184
# f0 = f0[:len(mel)]
185
pad_size=(int(len(wav_data) // hparams['hop_size']) - len(f0) + 1) // 2
186
f0 = np.pad(f0,[[pad_size,len(mel) - len(f0) - pad_size]], mode='constant')
187
pitch_coarse = f0_to_coarse(f0, hparams)
188
return f0, pitch_coarse
189
190
191
def get_pitch_crepe(wav_data, mel, hparams, threshold=0.05):
192
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
193
device = torch.device("cuda")
194
# crepe只支持16khz采样率,需要重采样
195
wav16k = resampy.resample(wav_data, hparams['audio_sample_rate'], 16000)
196
wav16k_torch = torch.FloatTensor(wav16k).unsqueeze(0).to(device)
197
198
# 频率范围
199
f0_min = hparams['f0_min']
200
f0_max = hparams['f0_max']
201
202
# 重采样后按照hopsize=80,也就是5ms一帧分析f0
203
f0, pd = torchcrepe.predict(wav16k_torch, 16000, 80, f0_min, f0_max, pad=True, model='full', batch_size=1024,
204
device=device, return_periodicity=True)
205
206
# 滤波,去掉静音,设置uv阈值,参考原仓库readme
207
pd = torchcrepe.filter.median(pd, 3)
208
pd = torchcrepe.threshold.Silence(-60.)(pd, wav16k_torch, 16000, 80)
209
f0 = torchcrepe.threshold.At(threshold)(f0, pd)
210
f0 = torchcrepe.filter.mean(f0, 3)
211
212
# 将nan频率(uv部分)转换为0频率
213
f0 = torch.where(torch.isnan(f0), torch.full_like(f0, 0), f0)
214
215
'''
216
np.savetxt('问棋-crepe.csv',np.array([0.005*np.arange(len(f0[0])),f0[0].cpu().numpy()]).transpose(),delimiter=',')
217
'''
218
219
# 去掉0频率,并线性插值
220
nzindex = torch.nonzero(f0[0]).squeeze()
221
f0 = torch.index_select(f0[0], dim=0, index=nzindex).cpu().numpy()
222
time_org = 0.005 * nzindex.cpu().numpy()
223
time_frame = np.arange(len(mel)) * hparams['hop_size'] / hparams['audio_sample_rate']
224
if f0.shape[0] == 0:
225
f0 = torch.FloatTensor(time_frame.shape[0]).fill_(0)
226
print('f0 all zero!')
227
else:
228
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1])
229
pitch_coarse = f0_to_coarse(f0, hparams)
230
return f0, pitch_coarse
231
232
233
def remove_empty_lines(text):
234
"""remove empty lines"""
235
assert (len(text) > 0)
236
assert (isinstance(text, list))
237
text = [t.strip() for t in text]
238
if "" in text:
239
text.remove("")
240
return text
241
242
243
class TextGrid(object):
244
def __init__(self, text):
245
text = remove_empty_lines(text)
246
self.text = text
247
self.line_count = 0
248
self._get_type()
249
self._get_time_intval()
250
self._get_size()
251
self.tier_list = []
252
self._get_item_list()
253
254
def _extract_pattern(self, pattern, inc):
255
"""
256
Parameters
257
----------
258
pattern : regex to extract pattern
259
inc : increment of line count after extraction
260
Returns
261
-------
262
group : extracted info
263
"""
264
try:
265
group = re.match(pattern, self.text[self.line_count]).group(1)
266
self.line_count += inc
267
except AttributeError:
268
raise ValueError("File format error at line %d:%s" % (self.line_count, self.text[self.line_count]))
269
return group
270
271
def _get_type(self):
272
self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2)
273
274
def _get_time_intval(self):
275
self.xmin = self._extract_pattern(r"xmin = (.*)", 1)
276
self.xmax = self._extract_pattern(r"xmax = (.*)", 2)
277
278
def _get_size(self):
279
self.size = int(self._extract_pattern(r"size = (.*)", 2))
280
281
def _get_item_list(self):
282
"""Only supports IntervalTier currently"""
283
for itemIdx in range(1, self.size + 1):
284
tier = OrderedDict()
285
item_list = []
286
tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1)
287
tier_class = self._extract_pattern(r"class = \"(.*)\"", 1)
288
if tier_class != "IntervalTier":
289
raise NotImplementedError("Only IntervalTier class is supported currently")
290
tier_name = self._extract_pattern(r"name = \"(.*)\"", 1)
291
tier_xmin = self._extract_pattern(r"xmin = (.*)", 1)
292
tier_xmax = self._extract_pattern(r"xmax = (.*)", 1)
293
tier_size = self._extract_pattern(r"intervals: size = (.*)", 1)
294
for i in range(int(tier_size)):
295
item = OrderedDict()
296
item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1)
297
item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1)
298
item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1)
299
item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1)
300
item_list.append(item)
301
tier["idx"] = tier_idx
302
tier["class"] = tier_class
303
tier["name"] = tier_name
304
tier["xmin"] = tier_xmin
305
tier["xmax"] = tier_xmax
306
tier["size"] = tier_size
307
tier["items"] = item_list
308
self.tier_list.append(tier)
309
310
def toJson(self):
311
_json = OrderedDict()
312
_json["file_type"] = self.file_type
313
_json["xmin"] = self.xmin
314
_json["xmax"] = self.xmax
315
_json["size"] = self.size
316
_json["tiers"] = self.tier_list
317
return json.dumps(_json, ensure_ascii=False, indent=2)
318
319
320
def get_mel2ph(tg_fn, ph, mel, hparams):
321
ph_list = ph.split(" ")
322
with open(tg_fn, "r", encoding='utf-8') as f:
323
tg = f.readlines()
324
tg = remove_empty_lines(tg)
325
tg = TextGrid(tg)
326
tg = json.loads(tg.toJson())
327
split = np.ones(len(ph_list) + 1, np.float) * -1
328
tg_idx = 0
329
ph_idx = 0
330
tg_align = [x for x in tg['tiers'][-1]['items']]
331
tg_align_ = []
332
for x in tg_align:
333
x['xmin'] = float(x['xmin'])
334
x['xmax'] = float(x['xmax'])
335
if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']:
336
x['text'] = ''
337
if len(tg_align_) > 0 and tg_align_[-1]['text'] == '':
338
tg_align_[-1]['xmax'] = x['xmax']
339
continue
340
tg_align_.append(x)
341
tg_align = tg_align_
342
tg_len = len([x for x in tg_align if x['text'] != ''])
343
ph_len = len([x for x in ph_list if not is_sil_phoneme(x)])
344
assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, tg_fn)
345
while tg_idx < len(tg_align) or ph_idx < len(ph_list):
346
if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]):
347
split[ph_idx] = 1e8
348
ph_idx += 1
349
continue
350
x = tg_align[tg_idx]
351
if x['text'] == '' and ph_idx == len(ph_list):
352
tg_idx += 1
353
continue
354
assert ph_idx < len(ph_list), (tg_len, ph_len, tg_align, ph_list, tg_fn)
355
ph = ph_list[ph_idx]
356
if x['text'] == '' and not is_sil_phoneme(ph):
357
assert False, (ph_list, tg_align)
358
if x['text'] != '' and is_sil_phoneme(ph):
359
ph_idx += 1
360
else:
361
assert (x['text'] == '' and is_sil_phoneme(ph)) \
362
or x['text'].lower() == ph.lower() \
363
or x['text'].lower() == 'sil', (x['text'], ph)
364
split[ph_idx] = x['xmin']
365
if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme(ph_list[ph_idx - 1]):
366
split[ph_idx - 1] = split[ph_idx]
367
ph_idx += 1
368
tg_idx += 1
369
assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align])
370
assert ph_idx >= len(ph_list) - 1, (ph_idx, ph_list, len(ph_list), [x['text'] for x in tg_align], tg_fn)
371
mel2ph = np.zeros([mel.shape[0]], np.int)
372
split[0] = 0
373
split[-1] = 1e8
374
for i in range(len(split) - 1):
375
assert split[i] != -1 and split[i] <= split[i + 1], (split[:-1],)
376
split = [int(s * hparams['audio_sample_rate'] / hparams['hop_size'] + 0.5) for s in split]
377
for ph_idx in range(len(ph_list)):
378
mel2ph[split[ph_idx]:split[ph_idx + 1]] = ph_idx + 1
379
mel2ph_torch = torch.from_numpy(mel2ph)
380
T_t = len(ph_list)
381
dur = mel2ph_torch.new_zeros([T_t + 1]).scatter_add(0, mel2ph_torch, torch.ones_like(mel2ph_torch))
382
dur = dur[1:].numpy()
383
return mel2ph, dur
384
385
386
def build_phone_encoder(data_dir):
387
phone_list_file = os.path.join(data_dir, 'phone_set.json')
388
phone_list = json.load(open(phone_list_file, encoding='utf-8'))
389
return TokenTextEncoder(None, vocab_list=phone_list, replace_oov=',')
390
391
392
def is_sil_phoneme(p):
393
return not p[0].isalpha()
394
395