经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python » 查看文章
基于Pytorch实现的声音分类实例代码
来源:jb51  时间:2022/6/21 12:20:35  对本文有异议

前言

本章我们来介绍如何使用Pytorch训练一个区分不同音频的分类模型,例如你有这样一个需求,需要根据不同的鸟叫声识别是什么种类的鸟,这时你就可以使用这个方法来实现你的需求了。

源码地址:https://github.com/yeyupiaoling/AudioClassification-Pytorch

环境准备

主要介绍libsora,PyAudio,pydub的安装,其他的依赖包根据需要自行安装。

  • Python 3.7
  • Pytorch 1.10.0

安装libsora

最简单的方式就是使用pip命令安装,如下:

  1. pip install pytest-runner
  2. pip install librosa==0.9.1

注意: 如果pip命令安装不成功,那就使用源码安装,下载源码:https://github.com/librosa/librosa/releases/, windows的可以下载zip压缩包,方便解压。

  1. pip install pytest-runner
  2. tar xzf librosa-<版本号>.tar.gz 或者 unzip librosa-<版本号>.tar.gz
  3. cd librosa-<版本号>/
  4. python setup.py install

如果出现 libsndfile64bit.dll': error 0x7e错误,请指定安装版本0.6.3,如 pip install librosa==0.6.3

安装ffmpeg, 下载地址:http://blog.gregzaal.com/how-to-install-ffmpeg-on-windows/,笔者下载的是64位,static版。
然后到C盘,笔者解压,修改文件名为 ffmpeg,存放在 C:\Program Files\目录下,并添加环境变量 C:\Program Files\ffmpeg\bin

最后修改源码,路径为 C:\Python3.7\Lib\site-packages\audioread\ffdec.py,修改32行代码,如下:

  1. COMMANDS = ('C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe', 'avconv')

安装PyAudio

使用pip安装命令,如下:

  1. pip install pyaudio

在安装的时候需要使用到C++库进行编译,如果读者的系统是windows,Python是3.7,可以在这里下载whl安装包,下载地址:https://github.com/intxcc/pyaudio_portaudio/releases

安装pydub

使用pip命令安装,如下:

  1. pip install pydub

训练分类模型

把音频转换成训练数据最重要的是使用了librosa,使用librosa可以很方便得到音频的梅尔频谱(Mel Spectrogram),使用的API为 librosa.feature.melspectrogram(),输出的是numpy值。关于梅尔频谱具体信息读者可以自行了解,跟梅尔频谱同样很重要的梅尔倒谱(MFCCs)更多用于语音识别中,对应的API为 librosa.feature.mfcc()。同样以下的代码,就可以获取到音频的梅尔频谱。

  1. wav, sr = librosa.load(data_path, sr=16000)
  2. features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400)
  3. features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)

生成数据列表

生成数据列表,用于下一步的读取需要,audio_path为音频文件路径,用户需要提前把音频数据集存放在dataset/audio目录下,每个文件夹存放一个类别的音频数据,每条音频数据长度在3秒以上,如 dataset/audio/鸟叫声/······audio是数据列表存放的位置,生成的数据类别的格式为 音频路径\t音频对应的类别标签,音频路径和标签用制表符 \t分开。读者也可以根据自己存放数据的方式修改以下函数。

Urbansound8K 是目前应用较为广泛的用于自动城市环境声分类研究的公共数据集,包含10个分类:空调声、汽车鸣笛声、儿童玩耍声、狗叫声、钻孔声、引擎空转声、枪声、手提钻、警笛声和街道音乐声。数据集下载地址:https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz。以下是针对Urbansound8K生成数据列表的函数。如果读者想使用该数据集,请下载并解压到 dataset目录下,把生成数据列表代码改为以下代码。

  1. # 生成数据列表
  2. def get_data_list(audio_path, list_path):
  3. sound_sum = 0
  4. audios = os.listdir(audio_path)
  5.  
  6. f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')
  7. f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')
  8.  
  9. for i in range(len(audios)):
  10. sounds = os.listdir(os.path.join(audio_path, audios[i]))
  11. for sound in sounds:
  12. if '.wav' not in sound:continue
  13. sound_path = os.path.join(audio_path, audios[i], sound)
  14. t = librosa.get_duration(filename=sound_path)
  15. # 过滤小于2.1秒的音频
  16. if t >= 2.1:
  17. if sound_sum % 100 == 0:
  18. f_test.write('%s\t%d\n' % (sound_path, i))
  19. else:
  20. f_train.write('%s\t%d\n' % (sound_path, i))
  21. sound_sum += 1
  22. print("Audio:%d/%d" % (i + 1, len(audios)))
  23.  
  24. f_test.close()
  25. f_train.close()
  26.  
  27.  
  28. if __name__ == '__main__':
  29. get_data_list('dataset/UrbanSound8K/audio', 'dataset')

创建 reader.py用于在训练时读取数据。编写一个 CustomDataset类,用读取上一步生成的数据列表。

  1. class CustomDataset(Dataset):
  2. def __init__(self, data_list_path, model='train', sr=16000, chunk_duration=3):
  3. super(CustomDataset, self).__init__()
  4. with open(data_list_path, 'r') as f:
  5. self.lines = f.readlines()
  6. self.model = model
  7. self.sr = sr
  8. self.chunk_duration = chunk_duration
  9.  
  10. def __getitem__(self, idx):
  11. try:
  12. audio_path, label = self.lines[idx].replace('\n', '').split('\t')
  13. spec_mag = load_audio(audio_path, mode=self.model, sr=self.sr, chunk_duration=self.chunk_duration)
  14. return spec_mag, np.array(int(label), dtype=np.int64)
  15. except Exception as ex:
  16. print(f"[{datetime.now()}] 数据: {self.lines[idx]} 出错,错误信息: {ex}", file=sys.stderr)
  17. rnd_idx = np.random.randint(self.__len__())
  18. return self.__getitem__(rnd_idx)
  19.  
  20. def __len__(self):
  21. return len(self.lines)

下面是在训练时或者测试时读取音频数据,训练时对转换的梅尔频谱数据随机裁剪,如果是测试,就取前面的,最好要执行归一化。

  1. def load_audio(audio_path, mode='train', sr=16000, chunk_duration=3):
  2. # 读取音频数据
  3. wav, sr_ret = librosa.load(audio_path, sr=sr)
  4. if mode == 'train':
  5. # 随机裁剪
  6. num_wav_samples = wav.shape[0]
  7. # 数据太短不利于训练
  8. if num_wav_samples < sr:
  9. raise Exception(f'音频长度不能小于1s,实际长度为:{(num_wav_samples / sr):.2f}s')
  10. num_chunk_samples = int(chunk_duration * sr)
  11. if num_wav_samples > num_chunk_samples + 1:
  12. start = random.randint(0, num_wav_samples - num_chunk_samples - 1)
  13. stop = start + num_chunk_samples
  14. wav = wav[start:stop]
  15. # 对每次都满长度的再次裁剪
  16. if random.random() > 0.5:
  17. wav[:random.randint(1, sr // 2)] = 0
  18. wav = wav[:-random.randint(1, sr // 2)]
  19. elif mode == 'eval':
  20. # 为避免显存溢出,只裁剪指定长度
  21. num_wav_samples = wav.shape[0]
  22. num_chunk_samples = int(chunk_duration * sr)
  23. if num_wav_samples > num_chunk_samples + 1:
  24. wav = wav[:num_chunk_samples]
  25. features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400)
  26. features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)
  27. # 归一化
  28. mean = np.mean(features, 0, keepdims=True)
  29. std = np.std(features, 0, keepdims=True)
  30. features = (features - mean) / (std + 1e-5)
  31. features = features.astype('float32')
  32. return features

训练

接着就可以开始训练模型了,创建 train.py。我们搭建简单的卷积神经网络,如果音频种类非常多,可以适当使用更大的卷积神经网络模型。通过把音频数据转换成梅尔频谱。然后定义优化方法和获取训练和测试数据。要注意 args.num_classes参数的值,这个是类别的数量,要根据你数据集中的分类数量来修改。

  1. def train(args):
  2. # 获取数据
  3. train_dataset = CustomDataset(args.train_list_path, model='train')
  4. train_loader = DataLoader(dataset=train_dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn, num_workers=args.num_workers)
  5.  
  6. test_dataset = CustomDataset(args.test_list_path, model='eval')
  7. test_loader = DataLoader(dataset=test_dataset, batch_size=args.batch_size, collate_fn=collate_fn, num_workers=args.num_workers)
  8. # 获取分类标签
  9. with open(args.label_list_path, 'r', encoding='utf-8') as f:
  10. lines = f.readlines()
  11. class_labels = [l.replace('\n', '') for l in lines]
  12. # 获取模型
  13. device = torch.device("cuda")
  14. model = EcapaTdnn(num_classes=args.num_classes)
  15. model.to(device)
  16.  
  17. # 获取优化方法
  18. optimizer = torch.optim.Adam(params=model.parameters(),
  19. lr=args.learning_rate,
  20. weight_decay=5e-4)
  21. # 获取学习率衰减函数
  22. scheduler = CosineAnnealingLR(optimizer, T_max=args.num_epoch)
  23.  
  24. # 恢复训练
  25. if args.resume is not None:
  26. model.load_state_dict(torch.load(os.path.join(args.resume, 'model.pth')))
  27. state = torch.load(os.path.join(args.resume, 'model.state'))
  28. last_epoch = state['last_epoch']
  29. optimizer_state = torch.load(os.path.join(args.resume, 'optimizer.pth'))
  30. optimizer.load_state_dict(optimizer_state)
  31. print(f'成功加载第 {last_epoch} 轮的模型参数和优化方法参数')
  32.  
  33. # 获取损失函数
  34. loss = torch.nn.CrossEntropyLoss()

最后执行训练,每100个batch打印一次训练日志,训练一轮之后执行测试和保存模型,在测试时,把每个batch的输出都统计,最后求平均值。

  1. for epoch in range(args.num_epoch):
  2. loss_sum = []
  3. accuracies = []
  4. for batch_id, (spec_mag, label) in enumerate(train_loader):
  5. spec_mag = spec_mag.to(device)
  6. label = label.to(device).long()
  7. output = model(spec_mag)
  8. # 计算损失值
  9. los = loss(output, label)
  10. optimizer.zero_grad()
  11. los.backward()
  12. optimizer.step()
  13.  
  14. # 计算准确率
  15. output = torch.nn.functional.softmax(output, dim=-1)
  16. output = output.data.cpu().numpy()
  17. output = np.argmax(output, axis=1)
  18. label = label.data.cpu().numpy()
  19. acc = np.mean((output == label).astype(int))
  20. accuracies.append(acc)
  21. loss_sum.append(los)
  22. if batch_id % 100 == 0:
  23. print(f'[{datetime.now()}] Train epoch [{epoch}/{args.num_epoch}], batch: {batch_id}/{len(train_loader)}, '
  24. f'lr: {scheduler.get_last_lr()[0]:.8f}, loss: {sum(loss_sum) / len(loss_sum):.8f}, '
  25. f'accuracy: {sum(accuracies) / len(accuracies):.8f}')
  26. scheduler.step()

每轮训练结束之后都会执行一次评估,和保存模型。评估会出来输出准确率,还保存了混合矩阵图片,如下。

预测

在训练结束之后,我们得到了一个模型参数文件,我们使用这个模型预测音频,在执行预测之前,需要把音频转换为梅尔频谱数据,最后输出的结果即为预测概率最大的标签。

  1. parser = argparse.ArgumentParser(description=__doc__)
  2. add_arg = functools.partial(add_arguments, argparser=parser)
  3. add_arg('audio_path', str, 'dataset/UrbanSound8K/audio/fold5/156634-5-2-5.wav', '图片路径')
  4. add_arg('num_classes', int, 10, '分类的类别数量')
  5. add_arg('label_list_path', str, 'dataset/label_list.txt', '标签列表路径')
  6. add_arg('model_path', str, 'models/model.pth', '模型保存的路径')
  7. args = parser.parse_args()
  8.  
  9.  
  10. # 获取分类标签
  11. with open(args.label_list_path, 'r', encoding='utf-8') as f:
  12. lines = f.readlines()
  13. class_labels = [l.replace('\n', '') for l in lines]
  14. # 获取模型
  15. device = torch.device("cuda")
  16. model = EcapaTdnn(num_classes=args.num_classes)
  17. model.to(device)
  18. model.load_state_dict(torch.load(args.model_path))
  19. model.eval()
  20.  
  21.  
  22. def infer():
  23. data = load_audio(args.audio_path, mode='infer')
  24. data = data[np.newaxis, :]
  25. data = torch.tensor(data, dtype=torch.float32, device=device)
  26. # 执行预测
  27. output = model(data)
  28. result = torch.nn.functional.softmax(output, dim=-1)
  29. result = result.data.cpu().numpy()
  30. # 显示图片并输出结果最大的label
  31. lab = np.argsort(result)[0][-1]
  32. print(f'音频:{args.audio_path} 的预测结果标签为:{class_labels[lab]}')
  33.  
  34.  
  35. if __name__ == '__main__':
  36. infer()

其他

为了方便读取录制数据和制作数据集,这里提供了两个程序,首先是 record_audio.py,这个用于录制音频,录制的音频帧率为44100,通道为1,16bit。

  1. import pyaudio
  2. import wave
  3. import uuid
  4. from tqdm import tqdm
  5. import os
  6.  
  7. s = input('请输入你计划录音多少秒:')
  8.  
  9. CHUNK = 1024
  10. FORMAT = pyaudio.paInt16
  11. CHANNELS = 1
  12. RATE = 44100
  13. RECORD_SECONDS = int(s)
  14. WAVE_OUTPUT_FILENAME = "save_audio/%s.wav" % str(uuid.uuid1()).replace('-', '')
  15.  
  16. p = pyaudio.PyAudio()
  17.  
  18. stream = p.open(format=FORMAT,
  19. channels=CHANNELS,
  20. rate=RATE,
  21. input=True,
  22. frames_per_buffer=CHUNK)
  23.  
  24. print("开始录音, 请说话......")
  25.  
  26. frames = []
  27.  
  28. for i in tqdm(range(0, int(RATE / CHUNK * RECORD_SECONDS))):
  29. data = stream.read(CHUNK)
  30. frames.append(data)
  31.  
  32. print("录音已结束!")
  33.  
  34. stream.stop_stream()
  35. stream.close()
  36. p.terminate()
  37.  
  38. if not os.path.exists('save_audio'):
  39. os.makedirs('save_audio')
  40.  
  41. wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
  42. wf.setnchannels(CHANNELS)
  43. wf.setsampwidth(p.get_sample_size(FORMAT))
  44. wf.setframerate(RATE)
  45. wf.writeframes(b''.join(frames))
  46. wf.close()
  47.  
  48. print('文件保存在:%s' % WAVE_OUTPUT_FILENAME)
  49. os.system('pause')

创建 crop_audio.py,在训练是只是裁剪前面的3秒的音频,所以我们要把录制的硬盘安装每3秒裁剪一段,把裁剪后音频存放在音频名称命名的文件夹中。最后把这些文件按照训练数据的要求创建数据列表和训练数据。

  1. import os
  2. import uuid
  3. import wave
  4. from pydub import AudioSegment
  5.  
  6.  
  7. # 按秒截取音频
  8. def get_part_wav(sound, start_time, end_time, part_wav_path):
  9. save_path = os.path.dirname(part_wav_path)
  10. if not os.path.exists(save_path):
  11. os.makedirs(save_path)
  12. start_time = int(start_time) * 1000
  13. end_time = int(end_time) * 1000
  14. word = sound[start_time:end_time]
  15. word.export(part_wav_path, format="wav")
  16.  
  17.  
  18. def crop_wav(path, crop_len):
  19. for src_wav_path in os.listdir(path):
  20. wave_path = os.path.join(path, src_wav_path)
  21. print(wave_path[-4:])
  22. if wave_path[-4:] != '.wav':
  23. continue
  24. file = wave.open(wave_path)
  25. # 帧总数
  26. a = file.getparams().nframes
  27. # 采样频率
  28. f = file.getparams().framerate
  29. # 获取音频时间长度
  30. t = int(a / f)
  31. print('总时长为 %d s' % t)
  32. # 读取语音
  33. sound = AudioSegment.from_wav(wave_path)
  34. for start_time in range(0, t, crop_len):
  35. save_path = os.path.join(path, os.path.basename(wave_path)[:-4], str(uuid.uuid1()) + '.wav')
  36. get_part_wav(sound, start_time, start_time + crop_len, save_path)
  37.  
  38.  
  39. if __name__ == '__main__':
  40. crop_len = 3
  41. crop_wav('save_audio', crop_len)

创建 infer_record.py,这个程序是用来不断进行录音识别,录音时间之所以设置为6秒,所以我们可以大致理解为这个程序在实时录音识别。通过这个应该我们可以做一些比较有趣的事情,比如把麦克风放在小鸟经常来的地方,通过实时录音识别,一旦识别到有鸟叫的声音,如果你的数据集足够强大,有每种鸟叫的声音数据集,这样你还能准确识别是那种鸟叫。如果识别到目标鸟类,就启动程序,例如拍照等等。

  1. # 录音参数
  2. CHUNK = 1024
  3. FORMAT = pyaudio.paInt16
  4. CHANNELS = 1
  5. RATE = 44100
  6. RECORD_SECONDS = 6
  7. WAVE_OUTPUT_FILENAME = "infer_audio.wav"
  8.  
  9. # 打开录音
  10. p = pyaudio.PyAudio()
  11. stream = p.open(format=FORMAT,
  12. channels=CHANNELS,
  13. rate=RATE,
  14. input=True,
  15. frames_per_buffer=CHUNK)
  16.  
  17. # 获取录音数据
  18. def record_audio():
  19. print("开始录音......")
  20.  
  21. frames = []
  22. for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
  23. data = stream.read(CHUNK)
  24. frames.append(data)
  25.  
  26. print("录音已结束!")
  27.  
  28. wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
  29. wf.setnchannels(CHANNELS)
  30. wf.setsampwidth(p.get_sample_size(FORMAT))
  31. wf.setframerate(RATE)
  32. wf.writeframes(b''.join(frames))
  33. wf.close()
  34. return WAVE_OUTPUT_FILENAME
  35.  
  36.  
  37. # 预测
  38. def infer(audio_path):
  39. data = load_audio(audio_path, mode='infer')
  40. data = data[np.newaxis, :]
  41. data = torch.tensor(data, dtype=torch.float32, device=device)
  42. # 执行预测
  43. output = model(data)
  44. result = torch.nn.functional.softmax(output, dim=-1)
  45. result = result.data.cpu().numpy()
  46. # 显示图片并输出结果最大的label
  47. lab = np.argsort(result)[0][-1]
  48. return class_labels[lab]
  49.  
  50.  
  51. if __name__ == '__main__':
  52. try:
  53. while True:
  54. # 加载数据
  55. audio_path = record_audio()
  56. # 获取预测结果
  57. label = infer(audio_path)
  58. print(f'预测的标签为:{label}')
  59. except Exception as e:
  60. print(e)
  61. stream.stop_stream()
  62. stream.close()
  63. p.terminate()

总结

到此这篇关于基于Pytorch实现声音分类的文章就介绍到这了,更多相关Pytorch实现声音分类内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号