语音的预处理--端点检测

引言

语音的实际应用场景中,经常是给定一段包含多句句子的长语音,这就产生了语音端点检测的需求,从而实现对句子的分割。端点检测可以是只检测长语音的开始和结束,也可以细化到每一句句子的开始和结束,以下示例为句子级的端点检测。

方法1

使用短时能量和谱质心特征进行端点检测,在matlab上有封装好的函数,以下为python版本。

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @author: fan weiquan

import os
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
import librosa
import scipy.signal


def ShortTimeEnergy(signal, windowLength, step):
    """
    计算短时能量
    Parameters
    ----------
    signal : 原始信号.
    windowLength : 帧长.
    step : 帧移.

    Returns
    -------
    E : 每一帧的能量.
    """
    signal = signal / np.max(signal) # 归一化
    curPos = 0
    L = len(signal)
    numOfFrames  = np.asarray(np.floor((L-windowLength)/step) + 1, dtype=int)
    E = np.zeros((numOfFrames, 1))
    for i in range(numOfFrames):
        window = signal[int(curPos):int(curPos+windowLength-1)];
        E[i] = (1/(windowLength)) * np.sum(np.abs(window**2));
        curPos = curPos + step;
    return E

def SpectralCentroid(signal,windowLength, step, fs):
    """
    计算谱质心
    Parameters
    ----------
    signal : 原始信号.
    windowLength : 帧长.
    step : 帧移.
    fs : 采样率.

    Returns
    -------
    C : 每一帧的谱质心.
    """
    signal = signal / np.max(signal) # 归一化
    curPos = 0
    L = len(signal)
    numOfFrames  = np.asarray(np.floor((L - windowLength) / step) + 1, dtype=int)
    H = np.hamming(windowLength)
    m = ((fs / (2 * windowLength)) * np.arange(1, windowLength, 1)).T
    C = np.zeros((numOfFrames, 1))
    for i in range(numOfFrames):
        window = H * (signal[int(curPos) : int(curPos + windowLength)])
        FFT = np.abs(np.fft.fft(window, 2 * int(windowLength)))
        FFT = FFT[1 : windowLength]
        FFT = FFT / np.max(FFT)
        C[i] = np.sum(m * FFT) / np.sum(FFT)
        if np.sum(window**2) < 0.010:
            C[i] = 0.0
        curPos = curPos + step;
    C = C / (fs/2)
    return C

def findMaxima(f, step):
    """
    寻找局部最大值
    Parameters
    ----------
    f : 输入序列.
    step : 搜寻窗长.

    Returns
    -------
    Maxima : 最大值索引 最大值
    countMaxima : 最大值的数量
    """
    ## STEP 1: 寻找最大值
    countMaxima = 0
    Maxima = []
    for i in range(len(f) - step - 1): # 对于序列中的每一个元素:
        if i >= step:
            if (np.mean(f[i - step : i]) < f[i]) and (np.mean(f[i + 1 : i + step + 1]) < f[i]): 
                # IF the current element is larger than its neighbors (2*step window)
                # --> keep maximum:
                countMaxima = countMaxima + 1
                Maxima.append([i, f[i]])
        else:
            if (np.mean(f[0 : i + 1]) <= f[i]) and (np.mean(f[i + 1 : i + step + 1]) < f[i]):
                # IF the current element is larger than its neighbors (2*step window)
                # --> keep maximum:
                countMaxima = countMaxima + 1
                Maxima.append([i, f[i]])

    ## STEP 2: 对最大值进行进一步处理
    MaximaNew = []
    countNewMaxima = 0
    i = 0
    while i < countMaxima:
        # get current maximum:

        curMaxima = Maxima[i][0]
        curMavVal = Maxima[i][1]

        tempMax = [Maxima[i][0]]
        tempVals = [Maxima[i][1]]
        i = i + 1

        # search for "neighbourh maxima":
        while (i < countMaxima) and (Maxima[i][0] - tempMax[len(tempMax) - 1] < step / 2):

            tempMax.append(Maxima[i][0])
            tempVals.append(Maxima[i][1])
            i = i + 1

        MM = np.max(tempVals)
        MI = np.argmax(tempVals) 
        if MM > 0.02 * np.mean(f): # if the current maximum is "large" enough:
            # keep the maximum of all maxima in the region:
            MaximaNew.append([tempMax[MI], f[tempMax[MI]]])
            countNewMaxima = countNewMaxima + 1   # add maxima
    Maxima = MaximaNew
    countMaxima = countNewMaxima

    return Maxima, countMaxima

def VAD(signal, fs):

    win = 0.05
    step = 0.05
    Eor = ShortTimeEnergy(signal, int(win * fs), int(step * fs));
    Cor = SpectralCentroid(signal, int(win * fs), int(step * fs), fs);
    E = scipy.signal.medfilt(Eor[:, 0], 5)
    E = scipy.signal.medfilt(E, 5)
    C = scipy.signal.medfilt(Cor[:, 0], 5)
    C = scipy.signal.medfilt(C, 5)

    E_mean = np.mean(E);
    Z_mean = np.mean(C);
    Weight = 100 # 阈值估计的参数
    # 寻找短时能量的阈值
    Hist = np.histogram(E, bins=10) # 计算直方图
    HistE = Hist[0]
    X_E = Hist[1]
    MaximaE, countMaximaE = findMaxima(HistE, 3) # 寻找直方图的局部最大值
    if len(MaximaE) >= 2: # 如果找到了两个以上局部最大值
        T_E = (Weight*X_E[MaximaE[0][0]] + X_E[MaximaE[1][0]]) / (Weight + 1)
    else:
        T_E = E_mean / 2

    # 寻找谱质心的阈值
    Hist = np.histogram(C, bins=10)
    HistC = Hist[0]
    X_C = Hist[1]
    MaximaC, countMaximaC = findMaxima(HistC, 3)
    if len(MaximaC)>=2:
        T_C = (Weight*X_C[MaximaC[0][0]]+X_C[MaximaC[1][0]]) / (Weight+1)
    else:
        T_C = Z_mean / 2

    # 阈值判断
    Flags1 = (E>=T_E)
    Flags2 = (C>=T_C)
    flags = np.array(Flags1 & Flags2, dtype=int)

    ## 提取语音片段
    count = 1
    segments = []
    while count < len(flags): # 当还有未处理的帧时
        # 初始化
        curX = []
        countTemp = 1
        while ((flags[count - 1] == 1) and (count < len(flags))):
            if countTemp == 1: # 如果是该语音段的第一帧
                Limit1 = np.round((count-1)*step*fs)+1 # 设置该语音段的开始边界
                if Limit1 < 1:
                    Limit1 = 1
            count = count + 1       # 计数器加一
            countTemp = countTemp + 1   # 当前语音段的计数器加一

        if countTemp > 1: # 如果当前循环中有语音段
            Limit2 = np.round((count - 1) * step * fs) # 设置该语音段的结束边界
            if Limit2 > len(signal):
                Limit2 = len(signal)
            # 将该语音段的首尾位置加入到segments的最后一行
            segments.append([int(Limit1), int(Limit2)])
        count = count + 1

    # 合并语音段
    i = 0
    while i < len(segments)-1:
    # for i in range(len(segments) - 1): # 对每一个语音段进行处理
        if segments[i][1] >= segments[i + 1][0] - fs//2:
            segments[i][1] = segments[i + 1][1]
            # segments[i + 1, :] = []
            del segments[i+1]
            i -= 1
        i += 1

    # 删除语音段
    i = 0
    while i < len(segments): 
        if segments[i][1] - segments[i][0] < fs//2:
            del segments[i]
        i += 1

    # seg_tuple = ()
    # for i in range(len(segments)): seg_tuple+=tuple(segments[i][:])

    return segments


if __name__ == "__main__":
    path_audio = 'test.wav'

    signal, fs = librosa.load(path_audio, mono=True)

    segments = VAD(signal, fs) # 端点检测
    print(segments)
    # for i, seg in enumerate(segments):
    #     wavfile.write(str(i)+'.wav', fs, signal[seg[0]:seg[1]])

方法2

基于webrtcvad的端点检测,逐帧判断是否是否静音,并通过平滑扩张等操作进一步调整端点。

from scipy.ndimage.morphology import binary_dilation
import librosa
import numpy as np
import struct
import webrtcvad
import soundfile as sf

# ** a的b次方  32767
int16_max = (2 ** 15) - 1
#输入
wav, sr = librosa.load("test.wav", sr=None)

# 计算语音检测窗口大小  #为整除 30秒X16000=总帧长
samples_per_window = (30 * 16000) // 1000

# 修剪音频的结尾,使其具有窗口大小的倍数。使wav的长度能被 samples_per_window整除
wav = wav[:len(wav) - (len(wav) % samples_per_window)]

# 浮点数波形转换为16位单声道PCM  *:接收到的参数会形成一个元组,**:接收到的参数会形成一个字典。如下代码。
# webrtcvad 的 is_speech 接收的是buf 所以这里需要转换
pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16))

# 执行语音激活检测
# timestamps = []
# flag = False
voice_flags = []
#  这里共有三种帧长可以用到,分别是80/10ms,160/20ms,240/30ms。其它采样率
# 的48k,32k,24k,16k会重采样到8k来计算VAD。之所以选择上述三种帧长度,是因为语
# 音信号是短时平稳信号,其在10ms~30ms之间可看成平稳信号,高斯马尔科夫等比较
# 的信号处理方法基于的前提是信号是平稳的,在10ms~30ms,平稳信号处理方法是可
# 以使用的。
#   从vad的代码中可以看出,实际上,系统只处理默认10ms,20ms,30ms长度的数据,
# 其它长度的数据没有支持,笔者修改过可以支持其它在10ms-30ms之间长度的帧长度
# 发现也是可以的。
#   vad检测共四种模式,用数字0~3来区分,激进程度与数值大小正相关。
# 0: Normal,1:low Bitrate, 2:Aggressive;3:Very Aggressive 可以根据实际的使用
vad = webrtcvad.Vad(mode=0)
for window_start in range(0, len(wav), samples_per_window):
    window_end = window_start + samples_per_window
    # append 进来的都是Boolean  这里以samples_per_windowx2 的长度去检测是否为人声
    isspeech_bool = vad.is_speech(pcm_wave[window_start * 2:window_end * 2], sample_rate=16000)
    voice_flags.append(isspeech_bool)
    # if flag ^ isspeech_bool:
        # timestamps.append(window_start)
        # flag = isspeech_bool

# for i in range(0, len(timestamps), 2):
    # tmp = wav[timestamps[i]:timestamps[i+1]]
    # sf.write(str(i)+".wav", tmp.astype(np.float32), sr, subtype='PCM_24')

voice_flags = np.array(voice_flags)
# 𝑣_𝑏𝑖𝑎𝑠𝑒𝑑𝑡=𝑣𝑡/(1−𝛽𝑡)
# 滑动平均计算
def moving_average(array, width):
    # 拼接 bool 二值化
    # width 执行滑动平均平滑时,帧的平均数。
    # 该值越大,VAD变化必须越大才能平滑。
    array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2)))
    # 一维数组累加
    ret = np.cumsum(array_padded, dtype=float)
    ret[width:] = ret[width:] - ret[:-width]
    return ret[width - 1:] / width

#滑动平均计算
audio_mask = moving_average(voice_flags, 8)
#将平均数四舍五入 转bool
audio_mask = np.round(audio_mask).astype(np.bool)
# 扩张浊音区 使用多维二元膨胀 是数学形态学的方法 类似opencv 也有开闭运算 腐蚀膨胀
audio_mask = binary_dilation(audio_mask, np.ones(6 + 1))
#使其与wav一样大小
audio_mask = np.repeat(audio_mask, samples_per_window)

timestamps = []
flag = False
for i, t in enumerate(audio_mask):
    if flag ^ t:
        timestamps.append(i)
        flag = t

for i in range(0, len(timestamps)-1, 2):
    res = wav[timestamps[i]:timestamps[i+1]]
    sf.write(str(i)+".wav", res.astype(np.float32), sr, subtype='PCM_24')

# #通过这个遮罩扣掉没有声音那部分
# res=wav[audio_mask == True]
# sf.write("out.wav", res.astype(np.float32), sr, subtype='PCM_24')

总结

端点检测方法非常多,除这之外还有比较经典的双门限法。实际测试中,这些方法在干净的语音中效果非常好,在现实带噪场景中,可能容易出现误分成很多子段的现象,为此,可以通过增加时间规则,对分割后的时间段和上下段的连接段判断是否合理,来精简检测结果。

参考

https://blog.csdn.net/qq_42688495/article/details/109333598
https://blog.csdn.net/weixin_43928944/article/details/108378413