Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
prophesier
GitHub Repository: prophesier/diff-svc
Path: blob/main/infer_tools/slicer.py
694 views
1
import time
2
3
import numpy as np
4
import torch
5
import torchaudio
6
from scipy.ndimage import maximum_filter1d, uniform_filter1d
7
8
9
def timeit(func):
10
def run(*args, **kwargs):
11
t = time.time()
12
res = func(*args, **kwargs)
13
print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t))
14
return res
15
16
return run
17
18
19
# @timeit
20
def _window_maximum(arr, win_sz):
21
return maximum_filter1d(arr, size=win_sz)[win_sz // 2: win_sz // 2 + arr.shape[0] - win_sz + 1]
22
23
24
# @timeit
25
def _window_rms(arr, win_sz):
26
filtered = np.sqrt(uniform_filter1d(np.power(arr, 2), win_sz) - np.power(uniform_filter1d(arr, win_sz), 2))
27
return filtered[win_sz // 2: win_sz // 2 + arr.shape[0] - win_sz + 1]
28
29
30
def level2db(levels, eps=1e-12):
31
return 20 * np.log10(np.clip(levels, a_min=eps, a_max=1))
32
33
34
def _apply_slice(audio, begin, end):
35
if len(audio.shape) > 1:
36
return audio[:, begin: end]
37
else:
38
return audio[begin: end]
39
40
41
class Slicer:
42
def __init__(self,
43
sr: int,
44
db_threshold: float = -40,
45
min_length: int = 5000,
46
win_l: int = 300,
47
win_s: int = 20,
48
max_silence_kept: int = 500):
49
self.db_threshold = db_threshold
50
self.min_samples = round(sr * min_length / 1000)
51
self.win_ln = round(sr * win_l / 1000)
52
self.win_sn = round(sr * win_s / 1000)
53
self.max_silence = round(sr * max_silence_kept / 1000)
54
if not self.min_samples >= self.win_ln >= self.win_sn:
55
raise ValueError('The following condition must be satisfied: min_length >= win_l >= win_s')
56
if not self.max_silence >= self.win_sn:
57
raise ValueError('The following condition must be satisfied: max_silence_kept >= win_s')
58
59
@timeit
60
def slice(self, audio):
61
samples = audio
62
if samples.shape[0] <= self.min_samples:
63
return {"0": {"slice": False, "split_time": f"0,{len(audio)}"}}
64
# get absolute amplitudes
65
abs_amp = np.abs(samples - np.mean(samples))
66
# calculate local maximum with large window
67
win_max_db = level2db(_window_maximum(abs_amp, win_sz=self.win_ln))
68
sil_tags = []
69
left = right = 0
70
while right < win_max_db.shape[0]:
71
if win_max_db[right] < self.db_threshold:
72
right += 1
73
elif left == right:
74
left += 1
75
right += 1
76
else:
77
if left == 0:
78
split_loc_l = left
79
else:
80
sil_left_n = min(self.max_silence, (right + self.win_ln - left) // 2)
81
rms_db_left = level2db(_window_rms(samples[left: left + sil_left_n], win_sz=self.win_sn))
82
split_win_l = left + np.argmin(rms_db_left)
83
split_loc_l = split_win_l + np.argmin(abs_amp[split_win_l: split_win_l + self.win_sn])
84
if len(sil_tags) != 0 and split_loc_l - sil_tags[-1][1] < self.min_samples and right < win_max_db.shape[
85
0] - 1:
86
right += 1
87
left = right
88
continue
89
if right == win_max_db.shape[0] - 1:
90
split_loc_r = right + self.win_ln
91
else:
92
sil_right_n = min(self.max_silence, (right + self.win_ln - left) // 2)
93
rms_db_right = level2db(_window_rms(samples[right + self.win_ln - sil_right_n: right + self.win_ln],
94
win_sz=self.win_sn))
95
split_win_r = right + self.win_ln - sil_right_n + np.argmin(rms_db_right)
96
split_loc_r = split_win_r + np.argmin(abs_amp[split_win_r: split_win_r + self.win_sn])
97
sil_tags.append((split_loc_l, split_loc_r))
98
right += 1
99
left = right
100
if left != right:
101
sil_left_n = min(self.max_silence, (right + self.win_ln - left) // 2)
102
rms_db_left = level2db(_window_rms(samples[left: left + sil_left_n], win_sz=self.win_sn))
103
split_win_l = left + np.argmin(rms_db_left)
104
split_loc_l = split_win_l + np.argmin(abs_amp[split_win_l: split_win_l + self.win_sn])
105
sil_tags.append((split_loc_l, samples.shape[0]))
106
if len(sil_tags) == 0:
107
return {"0": {"slice": False, "split_time": f"0,{len(audio)}"}}
108
else:
109
chunks = []
110
# 第一段静音并非从头开始,补上有声片段
111
if sil_tags[0][0]:
112
chunks.append({"slice": False, "split_time": f"0,{sil_tags[0][0]}"})
113
for i in range(0, len(sil_tags)):
114
# 标识有声片段(跳过第一段)
115
if i:
116
chunks.append({"slice": False, "split_time": f"{sil_tags[i - 1][1]},{sil_tags[i][0]}"})
117
# 标识所有静音片段
118
chunks.append({"slice": True, "split_time": f"{sil_tags[i][0]},{sil_tags[i][1]}"})
119
# 最后一段静音并非结尾,补上结尾片段
120
if sil_tags[-1][1] != len(audio):
121
chunks.append({"slice": False, "split_time": f"{sil_tags[-1][1]},{len(audio)}"})
122
chunk_dict = {}
123
for i in range(len(chunks)):
124
chunk_dict[str(i)] = chunks[i]
125
return chunk_dict
126
127
128
def cut(audio_path, db_thresh=-30, min_len=5000, win_l=300, win_s=20, max_sil_kept=500):
129
audio, sr = torchaudio.load(audio_path)
130
if len(audio.shape) == 2 and audio.shape[1] >= 2:
131
audio = torch.mean(audio, dim=0).unsqueeze(0)
132
audio = audio.cpu().numpy()[0]
133
134
slicer = Slicer(
135
sr=sr,
136
db_threshold=db_thresh,
137
min_length=min_len,
138
win_l=win_l,
139
win_s=win_s,
140
max_silence_kept=max_sil_kept
141
)
142
chunks = slicer.slice(audio)
143
return chunks
144
145
146
def chunks2audio(audio_path, chunks):
147
chunks = dict(chunks)
148
audio, sr = torchaudio.load(audio_path)
149
if len(audio.shape) == 2 and audio.shape[1] >= 2:
150
audio = torch.mean(audio, dim=0).unsqueeze(0)
151
audio = audio.cpu().numpy()[0]
152
result = []
153
for k, v in chunks.items():
154
tag = v["split_time"].split(",")
155
result.append((v["slice"], audio[int(tag[0]):int(tag[1])]))
156
return result, sr
157
158
159
160