Exploration of Audio Visual Dataset

Exploration of Audio Visual Dataset

HDTF_TFHP

AudioVisualDataset

如上图所示,一个典型的 Audio Visual Dataset 包含了音频和视频数据,通常在是使用的时候将视频序列分成多个片段(Clip),每一个片段对应了一段含有特定样本数量的音频。由于音频的连续性以及保证生成视频的连续性,对于单帧图像又同时对应了一段音频片段。

下文以 HDTF_TFHP 数据集为例,解析Audio Visual 数据集的构造和使用。

相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# data config and path
rot_repr = 'aa'
lmdb_dir = './datasets/HDTF_TFHP'
split_file = osp.join(lmdb_dir, 'train.txt')
coef_stats_file = './datasets/HDTF_TFHP/stats_train.npz'

# calculate the number of audio samples per frame
coef_fps = 25
audio_unit = 16000. / coef_fps # num of samples per frame

# select some motions as one training sample
n_motions = 100

# total number of audio samples
n_audio_samples = round(audio_unit * n_motions)

# extract two samples for two windows prediction
coef_total_len = n_motions * 2
audio_total_len = round(audio_unit * coef_total_len)
print(['audio_unit', audio_unit])
print(['n_audio_samples', n_audio_samples])
print(['coef_total_len', coef_total_len])
print(['audio_total_len', audio_total_len])
results
1
2
3
4
['audio_unit', 640.0]
['n_audio_samples', 64000]
['coef_total_len', 200]
['audio_total_len', 128000]

音频片段计算

视频序列的帧率为 coef_fps = 25,每秒对应有 25 帧图像,而音频采样率为 16000 Hz,即每秒钟对声音波形采集了16000个数据点,保证能够捕捉到更高声音的细节。人类语音的主要频率范围在 300 Hz ~ 3400 Hz,采用更高的采样率可以保证语音识别、TTS等任务的准确性。

因此,需要计算单帧图像对应的音频样本数,即 audio_unit = 16000 / coef_fps

不同采样率对同一个 440Hz 声波的前 5 毫秒信号对比图

训练样本

DiffPoseTalk Sun et al., 2024 中,训练样本默认设定 motions = 100 作为一个训练样本,并连续选择两个训练样本分别为了训练两个不同的窗口。

因此,便会有了上面 coef_total_len = n_motions * 2 并计算总共的音频样本数。

数据加载

Clip 长度

保存的Audio Visual数据也是以 Clip 形式存储的,但是其长度与训练样本长度不需要保持一致,因此需要获取保存数据的 Clip 长度,便于后续从一个长序列(人物整个说话序列)中截取指定的 Clip 作为一个训练样本。

如下,一个 Clip 片段长度只有 100 帧图像。

获取Clip长度
1
2
3
4
5
6
7
8
9
10
11
import lmdb
import pickle

lmdb_env = lmdb.open(str(lmdb_dir), readonly=True, lock=False, readahead=False, meminit=False)

with lmdb_env.begin(write=False) as txn:
print(pickle.loads(txn.get('metadata'.encode())))
clip_len = pickle.loads(txn.get('metadata'.encode()))['seg_len']
audio_clip_len = round(audio_unit * clip_len)
print(['clip_len', clip_len])
print(['audio_clip_len', audio_clip_len])
Clip长度
1
2
3
{'seg_len': 100}
['clip_len', 100]
['audio_clip_len', 64000]

序列加载

在此加载第一个序列样本,获取总共长度有 747 帧图像。

sample_0
1
2
3
4
5
6
7
8
index = 0
# Read audio and coef
with lmdb_env.begin(write=False) as txn:
meta_key = f'{entries[index]}/metadata'.encode()
metadata = pickle.loads(txn.get(meta_key))
seq_len = metadata['n_frames']
print(metadata)
print(['seq_len', seq_len])
sample_0
1
2
{'n_frames': 747}
['seq_len', 747]

随机裁剪

为了保证模型能够对音频长度变化有一定的鲁棒性,从样本序列中随机裁剪出一段固定长度的数据作为训练样本。

crop
1
2
3
4
5
6
7
8
9
10
11
12
crop_strategy = 'random'
# Crop the audio and coef
if crop_strategy == 'random':
start_frame = np.random.randint(0, seq_len - coef_total_len + 1)
elif crop_strategy == 'begin':
start_frame = 0
elif crop_strategy == 'end':
start_frame = seq_len - coef_total_len
else:
raise ValueError(f'Unknown crop strategy: {crop_strategy}')

print(['start_frame', start_frame])
crop_start_frame
1
['start_frame', 272]

Clip 截取

对整个序列进行随机裁剪后,就需要根据需要的motions长度从起始帧开始进行 Clip 截取。

首先根据 start_framecoef_total_len 计算出需要的 clip 片段所在位置,并计算出对于每一个 clip片段需要的起始帧和结束帧。

然后从 LMDB 数据库中读取对应的 clip 片段,并截取出对应帧区间的 FLAME 参数。

最后根据每一帧图像对应音频长度计算出音频序列的起止位置,从音频片段中截取出对应的音频区间。

Clip截取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import io
import torchaudio

coef_keys = ['shape', 'exp', 'pose']
coef_dict = {k: [] for k in coef_keys}
audio = []

# locate the clip boundaries
start_clip = start_frame // clip_len
end_clip = (start_frame + coef_total_len - 1) // clip_len + 1
with lmdb_env.begin(write=False) as txn:
for clip_idx in range(start_clip, end_clip):
print(f'clip_idx: {clip_idx:03d}')
key = f'{entries[0]}/{clip_idx:03d}'.encode()
start_idx = max(start_frame - clip_idx * clip_len, 0)
end_idx = min(start_frame + coef_total_len - clip_idx * clip_len, clip_len)
print([start_idx, end_idx])

entry = pickle.loads(txn.get(key))
for coef_key in coef_keys:
coef_dict[coef_key].append(entry['coef'][coef_key][start_idx:end_idx])

audio_data = entry['audio']
audio_clip, sr = torchaudio.load(io.BytesIO(audio_data))
assert sr == 16000, f'Invalid sampling rate: {sr}'
audio_clip = audio_clip.squeeze()
audio.append(audio_clip[round(start_idx * audio_unit):round(end_idx * audio_unit)])
print(['audio start id:', round(start_idx * audio_unit)])
print(['audio end id:', round(end_idx * audio_unit)])
Clip截取
1
2
3
4
5
6
7
8
9
10
11
12
clip_idx: 002
[72, 100]
['audio start id:', 46080]
['audio end id:', 64000]
clip_idx: 003
[0, 100]
['audio start id:', 0]
['audio end id:', 64000]
clip_idx: 004
[0, 72]
['audio start id:', 0]
['audio end id:', 46080]

数据处理

音频标准化

汇总提取的 FLAME 参数以及音频序列,并对音频序列数据进行标准化,便于模型训练的稳定性以及收敛性。

normalize(audio)
1
2
3
4
5
6
7
8
9
10
11
12
# concat the parameters
coef_dict = {k: torch.tensor(np.concatenate(coef_dict[k], axis=0)) for k in coef_keys}
assert coef_dict['exp'].shape[0] == coef_total_len, f'Invalid coef length: {coef_dict["exp"].shape[0]}'

# concat and normalize the audio data
audio = torch.cat(audio, dim=0)
print(f"audio shape: {audio.shape}")
assert audio.shape[0] == coef_total_len * audio_unit, f'Invalid audio length: {audio.shape[0]}'
audio_mean = audio.mean()
audio_std = audio.std()
print(f"audio mean: {audio_mean}, audio std: {audio_std}")
audio = (audio - audio_mean) / (audio_std + 1e-5)
normalize(audio)
1
2
audio shape: torch.Size([128000])
audio mean: -4.469394752959488e-06, audio std: 0.025712737813591957

参数标准化

同样的,需要对 FLAME 参数进行标准化,以便于模型的训练过程,但是这里是通过加载的预先学习好的各个 FLAME系数的均值和方差。
因为每个人的音频特征跟身份是强相关的,而FLAME参数表征的所有系数都在同一个特征空间中,具有相同的数据特性。

最后,根据提取的 motions 将其分为了两个子样本,用于两个窗口的学习。

normalize(coefficients)
1
2
3
4
5
6
7
8
9
# normalize coef if applicable
if coef_stats is not None:
coef_dict = {k: (coef_dict[k] - coef_stats[f'{k}_mean']) / (coef_stats[f'{k}_std'] + 1e-9)
for k in keys}

# Extract two consecutive audio/coef clips
audio_pair = [audio[:n_audio_samples].clone(), audio[-n_audio_samples:].clone()]
coef_pair = [{k: coef_dict[k][:n_motions].clone() for k in keys},
{k: coef_dict[k][-n_motions:].clone() for k in keys}]

标准数据集构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import pickle
import lmdb
import io
import torchaudio
import torch
import numpy as np
from base import Datum, DatasetBase, DATASET_REGISTRY
import logging
logger: logging.Logger

@DATASET_REGISTRY.register()
class HDTF_TFHP(DatasetBase):

def __init__(self, cfg):
# data config and path
root = os.path.abspath(os.path.expanduser(cfg.ROOT))
self.dataset_dir = os.path.join(root, cfg.NAME)
lmdb_path = self.dataset_dir
split_path = [os.path.join(self.dataset_dir, cfg.HDTF_TFHP.TRAIN),
os.path.join(self.dataset_dir, cfg.HDTF_TFHP.VAL),
os.path.join(self.dataset_dir, cfg.HDTF_TFHP.TEST)]
coef_stats_path = os.path.join(self.dataset_dir, cfg.HDTF_TFHP.COEF_STATS)
if coef_stats_path is not None:
coef_stats = dict(np.load(coef_stats_path))
self.coef_stats = {x: torch.tensor(coef_stats[x]) for x in coef_stats}
else:
self.coef_stats = None
logger.warning('Warning: No stats file found. Coef will not be normalized.')

# calculate the number of audio samples per frame
self.audio_unit = cfg.HDTF_TFHP.AUDIO_SR / cfg.HDTF_TFHP.COEF_FPS

# total number of motions and audio samples
self.n_motions = cfg.HDTF_TFHP.MOTIONS
self.n_audio_samples = round(self.audio_unit * self.n_motions)
self.coef_total_len = self.n_motions * 2
self.audio_total_len = round(self.audio_unit * self.coef_total_len)

# Load lmdb env and get the clip len
lmdb_env = lmdb.open(str(self.lmdb_dir), readonly=True, lock=False, readahead=False, meminit=False)
with lmdb_env.begin(write=False) as txn:
self.clip_len = pickle.loads(txn.get('metadata'.encode()))['seg_len']
self.audio_clip_len = round(self.audio_unit * self.clip_len)

# Read split file
subjects_dict = {"train": [], "val": [], "test": []}
for split, fpath in zip(subjects_dict, split_path):
with open(fpath) as f:
for line in f:
subjects_dict[split].append(line.strip())

data_dict = {"train": [], "val": [], "test": []}
for split in ["train", "val", "test"]:
for subject in subjects_dict[split]:
# Read audio and coef
with lmdb_env.begin(write=False) as txn:
meta_key = f'{subject}/metadata'.encode()
metadata = pickle.loads(txn.get(meta_key))
seq_len = metadata['n_frames']

# Crop the audio and coef
if cfg.HDTF_TFHP.CROP == 'random':
start_frame = np.random.randint(0, seq_len - self.coef_total_len + 1)
elif cfg.HDTF_TFHP.CROP == 'begin':
start_frame = 0
elif cfg.HDTF_TFHP.CROP == 'end':
start_frame = seq_len - self.coef_total_len
else:
raise ValueError(f'Unknown crop strategy: {cfg.HDTF_TFHP.CROP}')

coef_dict = {'shape': [], 'exp': [], 'pose': []}
audio = []
start_clip = start_frame // self.clip_len
end_clip = (start_frame + self.coef_total_len - 1) // self.clip_len + 1
with lmdb_env.begin(write=False) as txn:
for clip_idx in range(start_clip, end_clip):
key = f'{subject}/{clip_idx:03d}'.encode()
start_idx = max(start_frame - clip_idx * self.clip_len, 0)
end_idx = min(start_frame + self.coef_total_len - clip_idx * self.clip_len, self.clip_len)

# load the coefficients
entry = pickle.loads(txn.get(key))
for coef_key in ['shape', 'exp', 'pose']:
coef_dict[coef_key].append(entry['coef'][coef_key][start_idx:end_idx])

audio_data = entry['audio']
audio_clip, audio_sr = torchaudio.load(io.BytesIO(audio_data))
assert audio_sr == cfg.HDTF_TFHP.AUDIO_SR, f'Invalid sampling rate: {audio_sr}'
audio_clip = audio_clip.squeeze()
audio.append(audio_clip[round(start_idx * self.audio_unit):round(end_idx * self.audio_unit)])

coef_dict = {k: torch.tensor(np.concatenate(coef_dict[k], axis=0)) for k in ['shape', 'exp', 'pose']}
assert coef_dict['exp'].shape[0] == self.coef_total_len, f'Invalid coef length: {coef_dict["exp"].shape[0]}'
audio = torch.cat(audio, dim=0)
assert audio.shape[0] == self.coef_total_len * self.audio_unit, f'Invalid audio length: {audio.shape[0]}'
audio_mean, audio_std = audio.mean(), audio.std()
audio = (audio - audio_mean) / (audio_std + 1e-5)

# normalize coef if applicable
if self.coef_stats is not None:
coef_dict = {k: (coef_dict[k] - self.coef_stats[f'{k}_mean']) / (self.coef_stats[f'{k}_std'] + 1e-9)
for k in ['shape', 'exp', 'pose']}

data_dict.append(Datum(name=subject, audio=audio, coefficients=coef_dict))

super().__init__(train=data_dict['train'], val=data_dict['val'], test=data_dict['test'])
Author

Zhihao Li

Posted on

2025-08-12

Updated on

2025-08-12

Licensed under


Comments