speechbrain.dataio.dataio 模块
数据读取和写入。
- Authors
Mirco Ravanelli 2020
阿库·柔赫 2020
周珏洁 2020
萨穆埃莱·科内尔 2020
阿卜杜勒·赫巴 2020
Gaëlle Laperrière 2021
萨哈尔·加奈 2021
Sylvain de Langen 2022
摘要
类:
逐行写入CSV文件。 |
函数:
创建带有 |
|
将指定张量上的任何填充值设置为mask_value。 |
|
将指定张量上的任何填充值设置为mask_value。 |
|
将一批整数ID转换为字符串标签。 |
|
保留语义概念和值以进行评估。 |
|
获取输入文件的md5校验和。 |
|
为每个序列创建一个二进制掩码。 |
|
加载CSV并格式化字符串值。 |
|
加载JSON并递归格式化字符串值。 |
|
用于加载 .pkl pickle 文件的实用函数。 |
|
加载一个pkl文件。 |
|
将字符序列合并为单词序列。 |
|
将多个csv文件合并为一个文件。 |
|
在开头使用 |
|
通用音频加载,基于自定义符号。 |
|
从文件路径中检索音频元数据。 |
|
基于自定义符号的通用音频加载。 |
|
读取Kaldi格式的标签。 |
|
将SpeechBrain风格的相对长度转换为绝对持续时间。 |
|
将输入文件列表的md5保存为一个pickled字典到文件中。 |
|
以pkl格式保存一个对象。 |
|
将单词序列分割为字符序列。 |
|
将音频写入磁盘。 |
|
将数据写入标准输出。 |
|
以文本格式写入数据。 |
参考
- speechbrain.dataio.dataio.load_data_json(json_path, replacements={})[source]
加载JSON并递归格式化字符串值。
- Parameters:
- Returns:
应用替换后的JSON数据。
- Return type:
Example
>>> json_spec = '''{ ... "ex1": {"files": ["{ROOT}/mic1/ex1.wav", "{ROOT}/mic2/ex1.wav"], "id": 1}, ... "ex2": {"files": [{"spk1": "{ROOT}/ex2.wav"}, {"spk2": "{ROOT}/ex2.wav"}], "id": 2} ... } ... ''' >>> tmpfile = getfixture('tmpdir') / "test.json" >>> with open(tmpfile, "w", encoding="utf-8") as fo: ... _ = fo.write(json_spec) >>> data = load_data_json(tmpfile, {"ROOT": "/home"}) >>> data["ex1"]["files"][0] '/home/mic1/ex1.wav' >>> data["ex2"]["files"][1]["spk2"] '/home/ex2.wav'
- speechbrain.dataio.dataio.load_data_csv(csv_path, replacements={})[source]
加载CSV并格式化字符串值。
使用SpeechBrain传统的CSV数据格式,其中CSV必须有一个'ID'字段。 如果有一个名为duration的字段,它将被解释为浮点数。 其余字段保持不变(传统的_format和_opts字段不用于以任何特殊方式加载数据)。
支持使用$to_replace进行类似Bash的字符串替换。
- Parameters:
- Returns:
应用替换后的CSV数据。
- Return type:
Example
>>> csv_spec = '''ID,duration,wav_path ... utt1,1.45,$data_folder/utt1.wav ... utt2,2.0,$data_folder/utt2.wav ... ''' >>> tmpfile = getfixture("tmpdir") / "test.csv" >>> with open(tmpfile, "w", encoding="utf-8") as fo: ... _ = fo.write(csv_spec) >>> data = load_data_csv(tmpfile, {"data_folder": "/home"}) >>> data["utt1"]["wav_path"] '/home/utt1.wav'
- speechbrain.dataio.dataio.read_audio_info(path) AudioMetaData[source]
从文件路径中检索音频元数据。行为与torchaudio.info完全相同,但尝试修复某些torchaudio版本和编解码器组合下可能损坏的元数据(如帧计数)。
请注意,在某些情况下,这可能会导致完整的文件遍历!
- Parameters:
path (str) – 要检查的音频文件的路径。
- Returns:
与
torchaudio.info返回的值相同,但如果num_frames原本为== 0,则可能会被修正。- Return type:
torchaudio.backend.common.AudioMetaData
注意
一些编解码器,如MP3,需要完整的文件遍历才能获取准确的长度信息。 在这些情况下,您最好读取整个音频文件,以避免处理时间加倍。
- speechbrain.dataio.dataio.read_audio(waveforms_obj)[source]
基于自定义符号的通用音频加载。
预期的使用场景是与由JSON指定的数据集结合使用。
参数可能只是文件的路径:
read_audio("/path/to/wav1.wav")或者,你可以在字典中指定更多选项,例如: ``` # 从样本8000到15999加载文件 read_audio({
“file”: “/path/to/wav2.wav”, “start”: 8000, “stop”: 16000
})
支持的编解码器取决于您的torchaudio后端。 有关更多详细信息,请参阅
torchaudio.load文档。- param waveforms_obj:
音频路径或包含所需配置的字典。
字典变体的键: -
"file"(str): 音频文件的路径。 -"start"(int, 可选): 要加载的第一个样本。 如果未指定,则从第一帧开始加载。 -"stop"(int, 可选): 要加载的最后一个样本(不包括)。 如果未指定或等于start,则从start加载到末尾。 如果stop超过文件的样本数,不会失败,并且会返回较少的帧。- type waveforms_obj:
字符串, 字典
- returns:
1-通道:音频张量,形状为:
(samples, )。 >=2-通道:音频张量,形状为:(samples, channels)。- rtype:
torch.Tensor
Example
>>> dummywav = torch.rand(16000) >>> import os >>> tmpfile = str(getfixture('tmpdir') / "wave.wav") >>> write_audio(tmpfile, dummywav, 16000) >>> asr_example = { "wav": tmpfile, "spk_id": "foo", "words": "foo bar"} >>> loaded = read_audio(asr_example["wav"]) >>> loaded.allclose(dummywav.squeeze(0),atol=1e-4) # replace with eq with sox_io backend True
- speechbrain.dataio.dataio.read_audio_multichannel(waveforms_obj)[source]
基于自定义符号的通用音频加载。
预期的使用场景是与由JSON指定的数据集结合使用。
自定义符号:
注释可以只是文件的路径: “/path/to/wav1.wav”
可以指定多个(可能是多通道的)文件,只要它们具有相同的长度: {“files”: [
“/path/to/wav1.wav”, “/path/to/wav2.wav” ]
}
或者你可以更简洁地指定一个文件: {“files”: “/path/to/wav2.wav”}
偏移样本数和停止样本数也可以指定为仅读取文件中的一段。 {“files”: [
“/path/to/wav1.wav”, “/path/to/wav2.wav” ]
“开始”: 8000 “结束”: 16000 }
- Parameters:
- Returns:
音频张量,形状为:(samples, )。
- Return type:
torch.Tensor
Example
>>> dummywav = torch.rand(16000, 2) >>> import os >>> tmpfile = str(getfixture('tmpdir') / "wave.wav") >>> write_audio(tmpfile, dummywav, 16000) >>> asr_example = { "wav": tmpfile, "spk_id": "foo", "words": "foo bar"} >>> loaded = read_audio(asr_example["wav"]) >>> loaded.allclose(dummywav.squeeze(0),atol=1e-4) # replace with eq with sox_io backend True
- speechbrain.dataio.dataio.write_audio(filepath, audio, samplerate)[source]
将音频写入磁盘。它基本上是一个包装器,用于支持以speechbrain格式(音频,通道)保存音频信号。
- Parameters:
filepath (path) – 保存音频文件的路径。
audio (torch.Tensor) – 音频文件,符合预期的speechbrain格式(信号,通道)。
samplerate (int) – 采样率(例如,16000)。
Example
>>> import os >>> tmpfile = str(getfixture('tmpdir') / "wave.wav") >>> dummywav = torch.rand(16000, 2) >>> write_audio(tmpfile, dummywav, 16000) >>> loaded = read_audio(tmpfile) >>> loaded.allclose(dummywav,atol=1e-4) # replace with eq with sox_io backend True
- speechbrain.dataio.dataio.to_floatTensor(x: (<class 'list'>, <class 'tuple'>, <class 'numpy.ndarray'>))[source]
- speechbrain.dataio.dataio.to_doubleTensor(x: (<class 'list'>, <class 'tuple'>, <class 'numpy.ndarray'>))[source]
- speechbrain.dataio.dataio.to_longTensor(x: (<class 'list'>, <class 'tuple'>, <class 'numpy.ndarray'>))[source]
- speechbrain.dataio.dataio.convert_index_to_lab(batch, ind2lab)[source]
将一批整数ID转换为字符串标签。
- Parameters:
- Returns:
列表的列表,与批次大小相同,标签来自ind2lab。
- Return type:
Example
>>> ind2lab = {1: "h", 2: "e", 3: "l", 4: "o"} >>> out = convert_index_to_lab([[4,1], [1,2,3,3,4]], ind2lab) >>> for seq in out: ... print("".join(seq)) oh hello
- speechbrain.dataio.dataio.relative_time_to_absolute(batch, relative_lens, rate)[source]
将SpeechBrain风格的相对长度转换为绝对持续时间。
在批次级别上操作。
- Parameters:
batch (torch.Tensor) – 用于确定持续时间的序列。
relative_lens (torch.Tensor) – 批次中每个序列的相对长度。批次中最长的序列需要具有相对长度1.0。
rate (float) – 序列元素在现实世界时间中出现的速率。如果批次是原始波形(推荐),则为采样率;如果批次是特征,则为1/frame_shift。这必须以1/s为单位。
- Returns:
每个序列的持续时间(以秒为单位)。
- Return type:
torch.Tensor
Example
>>> batch = torch.ones(2, 16000) >>> relative_lens = torch.tensor([3./4., 1.0]) >>> rate = 16000 >>> print(relative_time_to_absolute(batch, relative_lens, rate)) tensor([0.7500, 1.0000])
- class speechbrain.dataio.dataio.IterativeCSVWriter(outstream, data_fields, defaults={})[source]
基础类:
object逐行写入CSV文件。
- Parameters:
Example
>>> import io >>> f = io.StringIO() >>> writer = IterativeCSVWriter(f, ["phn"]) >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts >>> writer.write("UTT1",2.5,"sil hh ee ll ll oo sil","string","") >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, >>> writer.write(ID="UTT2",phn="sil ww oo rr ll dd sil",phn_format="string") >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, UTT2,,sil ww oo rr ll dd sil,string, >>> writer.set_default('phn_format', 'string') >>> writer.write_batch(ID=["UTT3","UTT4"],phn=["ff oo oo", "bb aa rr"]) >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, UTT2,,sil ww oo rr ll dd sil,string, UTT3,,ff oo oo,string, UTT4,,bb aa rr,string,
- speechbrain.dataio.dataio.write_txt_file(data, filename, sampling_rate=None)[source]
以文本格式写入数据。
- Parameters:
data (str, list, torch.Tensor, numpy.ndarray) – 要写入文本文件的数据。
filename (str) – 写入数据的文件路径。
sampling_rate (None) – 未使用,仅用于接口兼容性。
Example
>>> tmpdir = getfixture('tmpdir') >>> signal=torch.tensor([1,2,3,4]) >>> write_txt_file(signal, tmpdir / 'example.txt')
- speechbrain.dataio.dataio.write_stdout(data, filename=None, sampling_rate=None)[source]
将数据写入标准输出。
- Parameters:
data (str, list, torch.Tensor, numpy.ndarray) – 要写入文本文件的数据。
filename (None) – 未使用,仅用于兼容性。
sampling_rate (None) – 未使用,仅用于兼容性。
Example
>>> tmpdir = getfixture('tmpdir') >>> signal = torch.tensor([[1,2,3,4]]) >>> write_stdout(signal, tmpdir / 'example.txt') [1, 2, 3, 4]
- speechbrain.dataio.dataio.length_to_mask(length, max_len=None, dtype=None, device=None)[source]
为每个序列创建一个二进制掩码。
参考:https://discuss.pytorch.org/t/how-to-generate-variable-length-mask/23397/3
- Parameters:
length (torch.LongTensor) – 包含批次中每个序列的长度。必须是一维的。
max_len (int) – 掩码的最大长度,也是第二维的大小。
dtype (torch.dtype, 默认值: None) – 生成掩码的数据类型。
device (torch.device, default: None) – 用于放置掩码变量的设备。
- Returns:
mask – 二进制掩码。
- Return type:
张量
Example
>>> length=torch.Tensor([1,2,3]) >>> mask=length_to_mask(length) >>> mask tensor([[1., 0., 0.], [1., 1., 0.], [1., 1., 1.]])
- speechbrain.dataio.dataio.read_kaldi_lab(kaldi_ali, kaldi_lab_opts)[source]
读取Kaldi格式的标签。
使用kaldi IO。
- Parameters:
- Returns:
lab – 包含标签的字典。
- Return type:
注意
这取决于kaldi-io-for-python。请单独安装它。 参见:https://github.com/vesis84/kaldi-io-for-python
Example
此示例需要kaldi文件。
` lab_folder = '/home/kaldi/egs/TIMIT/s5/exp/dnn4_pretrain-dbn_dnn_ali' read_kaldi_lab(lab_folder, 'ali-to-pdf') `
- speechbrain.dataio.dataio.get_md5(file)[source]
获取输入文件的md5校验和。
- Parameters:
file (str) – 计算校验和的文件路径。
- Returns:
给定文件路径的校验和。
- Return type:
md5
Example
>>> get_md5('tests/samples/single-mic/example1.wav') 'c482d0081ca35302d30d12f1136c34e5'
- speechbrain.dataio.dataio.save_md5(files, out_file)[source]
将输入文件列表的md5保存为一个pickled字典到文件中。
Example
>>> files = ['tests/samples/single-mic/example1.wav'] >>> tmpdir = getfixture('tmpdir') >>> save_md5(files, tmpdir / "md5.pkl")
- speechbrain.dataio.dataio.save_pkl(obj, file)[source]
以pkl格式保存对象。
Example
>>> tmpfile = getfixture('tmpdir') / "example.pkl" >>> save_pkl([1, 2, 3, 4, 5], tmpfile) >>> load_pkl(tmpfile) [1, 2, 3, 4, 5]
- speechbrain.dataio.dataio.load_pkl(file)[source]
加载一个pkl文件。
例如,参见
save_pkl。- Parameters:
文件 (str) – 输入pkl文件的路径。
- Return type:
加载的对象。
- speechbrain.dataio.dataio.prepend_bos_token(label, bos_index)[source]
在开头使用标记创建标签。
- Parameters:
标签 (torch.IntTensor) – 包含原始标签。大小必须为:[batch_size, max_length]。
bos_index (int) –
标记的索引。
- Returns:
new_label – 新的标签,开头带有
。 - Return type:
张量
Example
>>> label=torch.LongTensor([[1,0,0], [2,3,0], [4,5,6]]) >>> new_label=prepend_bos_token(label, bos_index=7) >>> new_label tensor([[7, 1, 0, 0], [7, 2, 3, 0], [7, 4, 5, 6]])
- speechbrain.dataio.dataio.append_eos_token(label, length, eos_index)[source]
创建带有标记附加的标签。
- Parameters:
标签 (torch.IntTensor) – 包含原始标签。大小必须为:[batch_size, max_length]
length (torch.LongTensor) – 包含每个标签序列的原始长度。必须是一维的。
eos_index (int) –
标记的索引。
- Returns:
new_label – 附加了
的新标签。 - Return type:
张量
Example
>>> label=torch.IntTensor([[1,0,0], [2,3,0], [4,5,6]]) >>> length=torch.LongTensor([1,2,3]) >>> new_label=append_eos_token(label, length, eos_index=7) >>> new_label tensor([[1, 7, 0, 0], [2, 3, 7, 0], [4, 5, 6, 7]], dtype=torch.int32)
- speechbrain.dataio.dataio.merge_char(sequences, space='_')[source]
将字符序列合并为单词序列。
- Parameters:
sequences (list) – 每个项目包含一个列表,这个列表包含一个字符序列。
space (string) – 该标记表示空格。默认值:_
- Return type:
列表包含每个句子的单词序列。
Example
>>> sequences = [["a", "b", "_", "c", "_", "d", "e"], ["e", "f", "g", "_", "h", "i"]] >>> results = merge_char(sequences) >>> results [['ab', 'c', 'de'], ['efg', 'hi']]
- speechbrain.dataio.dataio.merge_csvs(data_folder, csv_lst, merged_csv)[source]
将多个csv文件合并为一个文件。
- Parameters:
data_folder (string) – 用于存储要合并的csv文件以及合并后的文件的文件夹。
csv_lst (list) – 要合并的csv文件的文件名。
merged_csv (string) – 要写入合并的CSV文件的文件名。
Example
>>> tmpdir = getfixture('tmpdir') >>> os.symlink(os.path.realpath("tests/samples/annotation/speech.csv"), tmpdir / "speech.csv") >>> merge_csvs(tmpdir, ... ["speech.csv", "speech.csv"], ... "test_csv_merge.csv")
- speechbrain.dataio.dataio.split_word(sequences, space='_')[source]
将单词序列分割成字符序列。
- Parameters:
sequences (list) – 每个项目包含一个列表,这个列表包含一个单词序列。
space (string) – 该标记表示空格。默认值:_
- Return type:
该列表包含每个句子的单词序列。
Example
>>> sequences = [['ab', 'c', 'de'], ['efg', 'hi']] >>> results = split_word(sequences) >>> results [['a', 'b', '_', 'c', '_', 'd', 'e'], ['e', 'f', 'g', '_', 'h', 'i']]
- speechbrain.dataio.dataio.clean_padding_(tensor, length, len_dim=1, mask_value=0.0)[source]
将指定张量上的任何填充值设置为mask_value。
例如,这可以用于在训练过程中将自动编码器的输出在超过指定长度后清零。
这是一个原地操作
- Parameters:
tensor (torch.Tensor) – 任意维度的张量
length (torch.Tensor) – 一个一维张量的长度
len_dim (int) – 表示长度的维度
mask_value (mixed) – 分配给填充位置的值
Example
>>> import torch >>> x = torch.arange(5).unsqueeze(0).repeat(3, 1) >>> x = x + torch.arange(3).unsqueeze(-1) >>> x tensor([[0, 1, 2, 3, 4], [1, 2, 3, 4, 5], [2, 3, 4, 5, 6]]) >>> length = torch.tensor([0.4, 1.0, 0.6]) >>> clean_padding_(x, length=length, mask_value=10.) >>> x tensor([[ 0, 1, 10, 10, 10], [ 1, 2, 3, 4, 5], [ 2, 3, 4, 10, 10]]) >>> x = torch.arange(5)[None, :, None].repeat(3, 1, 2) >>> x = x + torch.arange(3)[:, None, None] >>> x = x * torch.arange(1, 3)[None, None, :] >>> x = x.transpose(1, 2) >>> x tensor([[[ 0, 1, 2, 3, 4], [ 0, 2, 4, 6, 8]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 5, 6], [ 4, 6, 8, 10, 12]]]) >>> clean_padding_(x, length=length, mask_value=10., len_dim=2) >>> x tensor([[[ 0, 1, 10, 10, 10], [ 0, 2, 10, 10, 10]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 10, 10], [ 4, 6, 8, 10, 10]]])
- speechbrain.dataio.dataio.clean_padding(tensor, length, len_dim=1, mask_value=0.0)[source]
将指定张量上的任何填充值设置为mask_value。
例如,这可以用于在训练过程中将自动编码器的输出清零,超过指定长度。
此版本的操作不会修改原始张量
- Parameters:
tensor (torch.Tensor) – 任意维度的张量
length (torch.Tensor) – 一个一维张量的长度
len_dim (int) – 表示长度的维度
mask_value (mixed) – 分配给填充位置的值
- Returns:
result – 更新填充后的张量。
- Return type:
torch.Tensor
Example
>>> import torch >>> x = torch.arange(5).unsqueeze(0).repeat(3, 1) >>> x = x + torch.arange(3).unsqueeze(-1) >>> x tensor([[0, 1, 2, 3, 4], [1, 2, 3, 4, 5], [2, 3, 4, 5, 6]]) >>> length = torch.tensor([0.4, 1.0, 0.6]) >>> x_p = clean_padding(x, length=length, mask_value=10.) >>> x_p tensor([[ 0, 1, 10, 10, 10], [ 1, 2, 3, 4, 5], [ 2, 3, 4, 10, 10]]) >>> x = torch.arange(5)[None, :, None].repeat(3, 1, 2) >>> x = x + torch.arange(3)[:, None, None] >>> x = x * torch.arange(1, 3)[None, None, :] >>> x = x.transpose(1, 2) >>> x tensor([[[ 0, 1, 2, 3, 4], [ 0, 2, 4, 6, 8]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 5, 6], [ 4, 6, 8, 10, 12]]]) >>> x_p = clean_padding(x, length=length, mask_value=10., len_dim=2) >>> x_p tensor([[[ 0, 1, 10, 10, 10], [ 0, 2, 10, 10, 10]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 10, 10], [ 4, 6, 8, 10, 10]]])
- speechbrain.dataio.dataio.extract_concepts_values(sequences, keep_values, tag_in, tag_out, space)[source]
保留语义概念和值以进行评估。
- Parameters:
- Return type:
列表包含每个句子的概念和值序列。
Example
>>> sequences = [['<response>','_','n','o','_','>','_','<localisation-ville>','_','L','e','_','M','a','n','s','_','>'], ['<response>','_','s','i','_','>'],['v','a','_','b','e','n','e']] >>> results = extract_concepts_values(sequences, True, '<', '>', '_') >>> results [['<response> no', '<localisation-ville> Le Mans'], ['<response> si'], ['']]