【コード解説・PyTorch】手話認識入門14 - 様々な改善手法5: アフィン変換によるデータ拡張

著者: Natsuki Takayama
作成日: 2024年06月04日(火) 00:00
最終更新日: 2024年06月27日(木) 11:04
カテゴリ: コンピュータビジョン

こんにちは.高山です.
先日の記事で告知しました手話入門記事の第十四回になります.

今回は手話動画から抽出した追跡点系列に対して,データ拡張を施すことで認識性能を改善する手法を紹介します.
具体的には,手話中の追跡点系列に対してアフィン変換を施すことで,様々な姿勢およびサイズの手話データを生成する手法を紹介します.

図1にアフィン変換の適用例を示します.

(a): アフィン変換前
(b): アフィン変換後
アフィン変換による追跡点系列の変形例

図1(b) では,平行移動,回転,拡大,およびせん断を組み合わせたアフィン変換を適用しています.

追跡点の座標値は,話者の体格やカメラとの位置関係によって変化します.
この問題に対しては,第四回の記事で紹介した追跡点の正規化でもある程度対処できます.
今回紹介するアフィン変換によるデータ拡張を併用することで,頑健性がより向上することが期待できます.

その他の観点としては,追跡点ベースの認識は画像ベースの認識よりも平行移動などに対して影響を受けやすい (認識結果が変わってしまいやすい) ことが挙げられます [Duan'22].
これは,画像データに比べて追跡点データに含まれている情報が少ないためと考えられています.
(その分,軽量で高速に動作するという利点があります)

アフィン変換により姿勢およびサイズのバリエーションを増やすことで,上記の問題に対しても頑健になることが期待できます.

今回解説するスクリプトはGitHub上に公開しています
複数の実験を行っている都合で,CPUで動かした場合は結構時間がかるのでご注意ください.

  • [Duan'22] H. Duan., et al. "Revisiting Skeleton-based Action Recognition," CVPR 2022.

1. 概要

1.1 今回説明する内容

実装の詳細に先立って,今回紹介する内容の概要を説明したいと思います.
図2は,先日の記事で説明した機械学習モデル構築のワークフローの何処が今回の説明箇所に該当するかを示しています.

図2: 学習モデル構築のワークフローと紹介箇所
学習モデル構築のワークフローと紹介箇所

今回説明するデータ拡張は,学習用データセットからデータを取り出す際に行う,前処理に該当します.
特徴量エンジニアリングとデータ拡張の関係については,第一回の記事 (第1.2項と第1.3項) または第十一回の記事 (第1.1項) をご参照ください.

  • [Amershi'19]: S. Amershi, et al., "Software Engineering for Machine Learning: A Case Study," IEEE/ACM ICSE-SEIP 2019.

1.2 追跡点系列のアフィン変換処理工程

図3に,追跡点系列のアフィン変換処理工程を示します.

図3: アフィン変換処理工程
アフィン変換処理工程

処理は非常にシンプルで,変換行列の算出とアフィン変換の適用から構成されます.
図3の右側に示しているとおり,変換は(b): 変換中心を原点に移動,(c): 拡大縮小,(d): 回転,(e): せん断,および (f): 平行移動の順で適用されます.
(実際は,これらの変換を合成した行列を算出します)

アフィン変換のより細かな話については以前の記事で解説しておりますので,併せてご一読いただければうれしいです.


1.3 正規化処理との組み合わせ

アフィン変換の基本的な処理工程は第1.2項で説明したとおりです.
ただし,正規化処理と組み合わせる場合は,処理の適用方法によって結果が異なるので注意する必要があります.

正規化前に全身に適用

図4に,正規化処理の前にアフィン変換を全身に対して適用した例を示します.

図4: アフィン変換の適用方法1: 正規化前に全身に適用
アフィン変換の適用方法1: 正規化前に全身に適用

この処理では,同一パラメータのアフィン変換が全追跡点に対して適用されます.
ただし,後に正規化を行っていることから,平行移動と拡大縮小について無効化されます.

正規化後に全身に適用

図5に,正規化処理の後にアフィン変換を全身に対して適用した例を示します.

図5: アフィン変換の適用方法2: 正規化後に全身に適用
アフィン変換の適用方法2: 正規化後に全身に適用

この処理では,同一パラメータのアフィン変換が全追跡点に対して適用されます.
正規化の後にアフィン変換を行っていることから,全ての変換が適用されます.
(見えづらくて申し訳ないですが,\([0, 1]\)の範囲を表す白矩形が図4よりも小さく写っています)

正規化後に部位毎に適用

図6に,正規化処理の後にアフィン変換を部位毎に適用した例を示します.

図6: アフィン変換の適用方法3: 正規化後に部位毎に適用
アフィン変換の適用方法3: 正規化後に部位毎に適用

この処理では,部位毎に異なるパラメータでアフィン変換を適用しています.
ここでは,違いが分かりやすいように,顔だけにアフィン変換を適用しています.

この処理ではより複雑な姿勢およびサイズの追跡点データを生成できます.
その代わり,パラメータの数が多いので調整は少し大変です.


1.4 先に結果

第2節以降では,いつも通り実装の紹介をしながら実験結果をお見せします.
コード紹介記事の方針として記事単体で全処理が分かるように書いており,少し長いので結果を先にお見せしたいと思います.

図7は,データ拡張が無い場合,およびアフィン変換の適用方法毎の,Validation Lossと認識率の推移を示しています.

図7: 認識性能比較結果
認識性能比較結果

横軸は学習・評価ループの繰り返し数 (Epoch) を示します.
縦軸はそれぞれの評価指標を示します.

各線の色と実験条件の関係は次のとおりです.

  • 青線 (Default): Pre-LN構成のTransformer
  • 橙線 (+ Pre-W): 正規化前に全身に適用
  • 緑線 (+ Post-W): 正規化後に全身に適用
  • 赤線 (+ Post-P): 正規化後に部位毎に適用

デフォルトのモデルには,第九回の記事で紹介した,Pre-LN構成のTransformerモデルを用います.

青線と他の線の比較結果から,データ拡張を適用した場合はロスの値,認識性能ともに改善していることが分かります.

なお,今回の実験では話を簡単にするために,実験条件以外のパラメータは固定にし,乱数の制御もしていません.
必ずしも同様の結果になるわけではないので,ご了承ください.


2. 前準備

2.1 データセットのダウンロード

ここからは実装方法の説明をしていきます.
まずは,前準備としてGoogle Colabにデータセットをアップロードします. ここの工程はこれまでの記事と同じですので,既に行ったことのある方は第2.3項まで飛ばしていただいて構いません.

まず最初に,データセットの格納先からデータをダウンロードし,ご自分のGoogle driveへアップロードしてください.

次のコードでGoogle driveをColabへマウントします.
Google Driveのマウント方法については,補足記事にも記載してあります.

1
2
3
from google.colab import drive

drive.mount("/content/drive")

ドライブ内のファイルをColabへコピーします.
パスはアップロード先を設定する必要があります.

# Copy to local.
!cp [path_to_dataset]/gislr_dataset_top10.zip gislr_top10.zip

データセットはZIP形式になっているので unzip コマンドで解凍します.

!unzip gislr_top10.zip
Archive:  gislr_top10.zip
   creating: dataset_top10/
  inflating: dataset_top10/16069.hdf5
  ...
  inflating: dataset_top10/sign_to_prediction_index_map.json

成功すると dataset_top10 以下にデータが解凍されます.
HDF5ファイルはデータ本体で,手話者毎にファイルが別れています.
JSONファイルは辞書ファイルで,TXTファイルは本データセットのライセンスです.

!ls dataset_top10
16069.hdf5  25571.hdf5  29302.hdf5  36257.hdf5  49445.hdf5  62590.hdf5
18796.hdf5  26734.hdf5  30680.hdf5  37055.hdf5  53618.hdf5  LICENSE.txt
2044.hdf5   27610.hdf5  32319.hdf5  37779.hdf5  55372.hdf5  sign_to_prediction_index_map.json
22343.hdf5  28656.hdf5  34503.hdf5  4718.hdf5   61333.hdf5

単語辞書には単語名と数値の関係が10単語分定義されています.

!cat dataset_top10/sign_to_prediction_index_map.json
{
    "listen": 0,
    "look": 1,
    "shhh": 2,
    "donkey": 3,
    "mouse": 4,
    "duck": 5,
    "uncle": 6,
    "hear": 7,
    "pretend": 8,
    "cow": 9
}

ライセンスはオリジナルと同様に,CC-BY 4.0 としています.

!cat dataset_top10/LICENSE.txt
The dataset provided by Natsuki Takayama (Takayama Research and Development Office) is licensed under CC-BY 4.0.
Author: Copyright 2024 Natsuki Takayama
Title: GISLR Top 10 dataset
Original licenser: Deaf Professional Arts Network and the Georgia Institute of Technology
Modification
- Extract 10 most frequent words.
- Packaged into HDF5 format.

次のコードでサンプルを確認します.
サンプルは辞書型のようにキーバリュー形式で保存されており,下記のように階層化されています.

- サンプルID (トップ階層のKey)
  |- feature: 入力特徴量で `[C(=3), T, J(=543)]` 形状.C,T,Jは,それぞれ特徴次元,フレーム数,追跡点数です.
  |- token: 単語ラベル値で `[1]` 形状.0から9の数値です.
1
2
3
4
5
6
7
8
9
with h5py.File("dataset_top10/16069.hdf5", "r") as fread:
    keys = list(fread.keys())
    print(keys)
    group = fread[keys[0]]
    print(group.keys())
    feature = group["feature"][:]
    token = group["token"][:]
    print(feature.shape)
    print(token)
['1109479272', '11121526', ..., '976754415']
<KeysViewHDF5 ['feature', 'token']>
(3, 23, 543)
[1]

2.2 モジュールのダウンロード

次に,過去の記事で実装したコードをダウンロードします.
本項は前回までに紹介した内容と同じですので,飛ばしていただいても構いません. コードはGithubのsrc/modules_gislrにアップしてあります (今後の記事で使用するコードも含まれています).

まず,下記のコマンドでレポジトリをダウンロードします.
(目的のディレクトリだけダウンロードする方法はまだ調査中です(^^;))

!wget https://github.com/takayama-rado/trado_samples/archive/master.zip
--2024-01-21 11:01:47--  https://github.com/takayama-rado/trado_samples/archive/master.zip
Resolving github.com (github.com)... 140.82.112.3
...
2024-01-21 11:01:51 (19.4 MB/s) - ‘master.zip’ saved [75710869]

ダウンロードしたリポジトリを解凍します.

!unzip -o master.zip -d master
Archive:  master.zip
641b06a0ca7f5430a945a53b4825e22b5f3b8eb6
   creating: master/trado_samples-main/
  inflating: master/trado_samples-main/.gitignore
  ...

モジュールのディレクトリをカレントディレクトリに移動します.

!mv master/trado_samples-main/src/modules_gislr .

他のファイルは不要なので削除します.

!rm -rf master master.zip gislr_top10.zip
!ls
dataset_top10 drive modules_gislr  sample_data

2.3 モジュールのロード

主要な処理の実装に先立って,下記のコードでモジュールをロードします.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import copy
import json
import math
import os
import random
import sys
from functools import partial
from inspect import signature
from pathlib import Path
from typing import (
    Any,
    Dict,
    List
)

# Third party's modules
import numpy as np

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import (
    DataLoader)

from torchvision.transforms import Compose

# Local modules
sys.path.append("modules_gislr")
from modules_gislr.dataset import (
    HDF5Dataset,
    merge_padded_batch)
from modules_gislr.defines import (
    get_fullbody_landmarks
)
from modules_gislr.layers import (
    Identity,
    GPoolRecognitionHead,
    TransformerEnISLR
)
from modules_gislr.train_functions import (
    test_loop,
    val_loop,
    train_loop
)
from modules_gislr.transforms import (
    PartsBasedNormalization,
    ReplaceNan,
    SelectLandmarksAndFeature,
    ToTensor
)
【コード解説】
- 標準モジュール
  - copy: データコピーライブラリ.Transformerブロック内でEncoder層をコピーするために使用します.
  - json: JSONファイル制御ライブラリ.辞書ファイルのロードに使用します.
  - math: 数学計算処理ライブラリ
  - os: システム処理ライブラリ
  - random: ランダム値生成ライブラリ
  - sys: Pythonインタプリタの制御ライブラリ.
    今回はローカルモジュールに対してパスを通すために使用します.
  - functools: 関数オブジェクトを操作するためのライブラリ.
    今回はDataLoaderクラスに渡すパディング関数に対して設定値をセットするために使用します.
  - inspect.signature: オブジェクトの情報取得ライブラリ.
  - pathlib.Path: オブジェクト指向のファイルシステム機能.
    主にファイルアクセスに使います.osモジュールを使っても同様の処理は可能です.
    高山の好みでこちらのモジュールを使っています(^^;).
  - typing: 関数などに型アノテーションを行う機能.
    ここでは型を忘れやすい関数に付けていますが,本来は全てアノテーションをした方が良いでしょう(^^;).
- 3rdパーティモジュール
  - numpy: 行列演算ライブラリ
  - torch: ニューラルネットワークライブラリ
  - torchvision: PyTorchと親和性が高い画像処理ライブラリ.
    今回はDatasetクラスに与える前処理をパッケージするために用います.
- ローカルモジュール: sys.pathにパスを追加することでロード可能
  - dataset: データセット操作用モジュール
  - defines: 各部位の追跡点,追跡点間の接続関係,およびそれらへのアクセス処理を
    定義したモジュール
  - layers: ニューラルネットワークのモデルやレイヤモジュール
  - transforms: 入出力変換処理モジュール
  - train_functions: 学習・評価処理モジュール

3. アフィン変換処理の実装

本節では,アフィン変換処理を実装します.
細かな実装については,以前の記事でも説明しているので,併せてご一読いただければうれしいです.

変換行列の算出処理

まず,次のコードで入力パラメータに応じた変換行列を算出する関数を実装します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_affine_matrix_2d(center,
                         trans,
                         scale,
                         rot,
                         skew,
                         to_radians=True,
                         order=["center", "scale", "rot", "skew", "trans"],
                         dtype=np.float32):
    center = np.array(center)
    trans = np.array(trans)
    scale = np.array(scale)
    center_m = np.array([[1, 0, -center[0]],
                         [0, 1, -center[1]],
                         [0, 0, 1]])
    scale_m = np.array([[scale[0], 0, 0],
                        [0, scale[1], 0],
                        [0, 0, 1]])
    rot = np.radians(rot) if to_radians else rot
    _cos = np.cos(rot)
    _sin = np.sin(rot)
    rot_m = np.array([[_cos, -_sin, 0],
                      [_sin, _cos, 0],
                      [0, 0, 1]])
    _tan = np.tan(np.radians(skew)) if to_radians else np.tan(skew)
    skew_m = np.array([[1, _tan[0], 0],
                       [_tan[1], 1, 0],
                       [0, 0, 1]])
    move = center + trans
    trans_m = np.array([[1, 0, move[0]],
                        [0, 1, move[1]],
                        [0, 0, 1]])

    mat = np.identity(3, dtype=dtype)
    for name in order:
        if name == "center":
            mat = np.matmul(center_m, mat)
        if name == "scale":
            mat = np.matmul(scale_m, mat)
        if name == "rot":
            mat = np.matmul(rot_m, mat)
        if name == "skew":
            mat = np.matmul(skew_m, mat)
        if name == "trans":
            mat = np.matmul(trans_m, mat)
    return mat.astype(dtype)
- 引数
  - center: 変換軸座標 `(center_x, center_y)`
    通常は物体中心位置や特定の追跡点位置を指定します.
  - trans: 平行移動量 `(trans_x, trans_y)`
  - scale: 拡大縮小量 `(scale_x, scale_y)`
  - rot: 回転量
    この値のみスカラーです.
  - skew: せん断量 `(skew_x, skew_y)`
  - to_radians: `True`の場合,内部で`rot`と`skew`をラジアンに変換します.
    この設定の場合は,`rot`と`skew`は度数で指定する必要があります.
    逆に,`False`の場合は,`rot`と`skew`はラジアンで指定する必要があります.
  - order: 変換の適用順番
    デフォルトでは,物体中心を原点に移動 -> 拡大縮小 -> 回転 -> せん断 -> 平行移動
    の順で変換が適用されます.
  - dtype: 出力データ型
- 9-11行目: 処理を簡単にするために,引数の型をnp.array型で統一 (`rot`と`skew`は内部処理の過程で変換される)
- 12-31行目: 各変換行列を算出
  `center_m` の算出では,指定座標を原点に移動するためにマイナスをかけた値を移動量として設定しています.
  回転とせん断はラジアン値を入力として,それぞれ対応する三角関数を適用した値を設定しています.
  平行移動では,最初に行う指定座標の原点への移動をオフセットとして加えた値を移動量として設定しています.
- 33-44行目: 初めに`mat` を単位行列で初期化し,`order` で指定した順番で変換行列を適用
- 44行目: `dtype` で指定した型に変換して値を返す

アフィン変換の適用処理

次のコードで,追跡点系列に変換行列を適用する関数を実装します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def apply_affine(inputs, mat, channel_first=True):
    if channel_first:
        # `[C, T, J] -> [T, J, C]`
        inputs = inputs.transpose([1, 2, 0])
    xy = inputs[:, :, :2]
    xy = np.concatenate([xy, np.ones([xy.shape[0], xy.shape[1], 1])], axis=-1)
    xy = np.einsum("...j,ij", xy, mat)
    inputs[:, :, :2] = xy[:, :, :-1]
    if channel_first:
        # `[T, J, C] -> [C, T, J]`
        inputs = inputs.transpose([2, 0, 1])
    return inputs
- 引数:
  - inputs: 追跡点配列
  - mat: アフィン変換行列 `[3, 3]`
  - channel_first: `True` の場合,内部で`[C, T, J] -> [T, J, C]` 形式に変換して処理を適用.
    処理適用後は元の形状に戻して返します.
- 5-6行目: 追跡点配列から $(x, y)$ 座標列を取り出して,特徴量次元の末尾に $1$ を加えて同次座標形式に変換
- 7行目: `xy` の特徴量次元に対してアフィン変換行列を適用
- 8行目: 変換後の$(x, y)$ 座標列を `inputs` に代入して返す

7行目の変換行列の適用では,アインシュタインの縮約表記を用いた演算 (einsum()) を行っています.
einsum() 関数については簡単な解説記事を以前に執筆しておりますので,併せてご一読いただければ幸いです.

データ拡張用クラスの実装

最後に,次のコードでデータ拡張用のクラスを実装します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class RandomAffineTransform2D():
    def __init__(self,
                 apply_ratio,
                 center_joints,
                 target_joints,
                 trans_range,
                 scale_range,
                 rot_range,
                 skew_range,
                 channel_first=True,
                 apply_post_mask=True,
                 random_seed=None,
                 order=["center", "scale", "rot", "skew", "trans"],
                 dtype=np.float32):
        self.apply_ratio = apply_ratio
        self.center_joints = center_joints
        self.target_joints = target_joints
        self.trans_range = trans_range
        self.scale_range = scale_range
        self.rot_range = np.radians(rot_range)
        self.skew_range = np.radians(skew_range)
        self.order = order
        self.dtype = dtype
        self.channel_first = channel_first
        self.apply_post_mask = apply_post_mask
        if random_seed is not None:
            self.rng = np.random.default_rng(random_seed)
        else:
            self.rng = np.random.default_rng()

    def __call__(self,
                 data: Dict[str, Any]) -> Dict[str, Any]:
        if self.rng.uniform() > self.apply_ratio:
            return data
        # `[C, T, J]`
        feature = data["feature"]

        # Calculate center position.
        temp = feature[:, :, self.center_joints]
        mask = np.sum(temp, axis=(0, 2)) != 0
        if np.all(mask == np.False_):
            return data

        # Use x and y only.
        # `[C, T, J] -> [C, J] -> [C]`
        center = temp[:, mask].mean(axis=1).mean(axis=1)[:2]

        trans = self.rng.uniform(self.trans_range[0], self.trans_range[1], 2)
        scale = self.rng.uniform(self.scale_range[0], self.scale_range[1], 2)
        rot = self.rng.uniform(self.rot_range[0], self.rot_range[1])
        skew = self.rng.uniform(self.skew_range[0], self.skew_range[1], 2)

        # Calculate matrix.
        mat = get_affine_matrix_2d(center, trans, scale, rot, skew,
            to_radians=False, order=self.order, dtype=self.dtype)

        # Apply transform.
        if self.apply_post_mask:
            temp = feature.reshape([feature.shape[0], -1])
            mask = (temp == 0).all(axis=0).reshape(
                [1, feature.shape[1], feature.shape[2]])
            mask = np.bitwise_not(mask).astype(feature.dtype)
        else:
            mask = None
        target = feature[:, :, self.target_joints]
        target = apply_affine(target, mat, self.channel_first)
        feature[:, :, self.target_joints] = target
        if mask is not None:
            feature *= mask
        data["feature"] = feature
        return data
- 引数
  - apply_ratio: データ拡張の適用確率
  - center_joints: 変換軸座標算出に使用する追跡点インデクス.
    ここで指定した追跡点の重心 (の全フレーム平均) が変換軸になります.
  - target_joints: 変換を適用する追跡点インデクス.
    部位毎に適用する場合は,ここの設定を変えたインスタンスを部位毎に生成する
    必要があります.
  - trans_range: 平行移動量の範囲 `(minimum, maximum)`
  - scale_range: 拡大縮小量の範囲 `(minimum, maximum)`
  - rot_range: 回転量の範囲,度数で指定 `(minimum, maximum)`
  - skew_range: せん断量の範囲,度数で指定 `(minimum, maximum)`
  - channel_first: `True` の場合は,入力形状が `[C, T, J]` であると想定して処理を行う.
    `False` の場合は,入力形状が `[T, J, C]` であると想定して処理を行います.
  - apply_post_mask: `True` の場合,後処理で追跡に失敗している点のマスク処理を行います.
  - random_seed: 疑似乱数生成器のシード,Noneの場合はNumpyのグローバル設定が用いられる
  - order: 変換の適用順番
    デフォルトでは,物体中心を原点に移動 -> 拡大縮小 -> 回転 -> せん断 -> 平行移動
    の順で変換が適用されます.
  - dtype: 出力データ型
- 15-29行目: 初期化処理
  26-29行目では疑似乱数生成器を生成しています.`random_seed` が指定されている場合は
  その値を用いて生成し,`None` の場合はNumpyのグローバル設定を用いるようにしています.
- 31-71行目: アフィン変換処理
  - 33-34行目: 乱数を生成し,`apply_ratio` 以上だった場合は何もせずに値を返す
  - 38-46行目: 変換軸を算出
    まず初めに,`center_joints` で指定した追跡点配列を抽出します.
    次に,欠損フレームを除去するための `mask` を生成します.
    最後に,`mask` を適用した上で平均座標 (x, y) を算出し `center` としています.
  - 48-51行目: 指定した範囲で各変換パラメータをランダムに生成
  - 54-55行目: アフィン変換行列を算出
    ここでは先程実装した `get_affine_matrix_2d()` をクラスから呼び出すようにしています.
  - 58-64行目: 後処理用のマスクを算出
    追跡成功を `1`,失敗を `0` で示すマスクを算出しています.
  - 65-67行目: アフィン変換適用
    ここでは先程実装した `apply_affine` をクラスから呼び出すようにしています.
  - 68-69行目: マスクを適用して,追跡失敗点の座標をゼロクリア

4. 認識モデルの動作確認

今回は,第九回の記事で紹介した,Pre-LN構成のTransformerモデルをそのまま用いて実験を行います.
ここではモデルの推論動作が正常に動くかだけ確かめます.

次のコードでデータセットからHDF5ファイルとJSONファイルのパスを読み込みます.

1
2
3
4
5
6
7
8
# Access check.
dataset_dir = Path("dataset_top10")
files = list(dataset_dir.iterdir())
dictionary = [fin for fin in files if ".json" in fin.name][0]
hdf5_files = [fin for fin in files if ".hdf5" in fin.name]

print(dictionary)
print(hdf5_files)
dataset_top10/sign_to_prediction_index_map.json
[PosixPath('dataset_top10/2044.hdf5'), PosixPath('dataset_top10/32319.hdf5'), PosixPath('dataset_top10/18796.hdf5'), PosixPath('dataset_top10/36257.hdf5'), PosixPath('dataset_top10/62590.hdf5'), PosixPath('dataset_top10/16069.hdf5'), PosixPath('dataset_top10/29302.hdf5'), PosixPath('dataset_top10/34503.hdf5'), PosixPath('dataset_top10/37055.hdf5'), PosixPath('dataset_top10/37779.hdf5'), PosixPath('dataset_top10/27610.hdf5'), PosixPath('dataset_top10/53618.hdf5'), PosixPath('dataset_top10/49445.hdf5'), PosixPath('dataset_top10/30680.hdf5'), PosixPath('dataset_top10/22343.hdf5'), PosixPath('dataset_top10/55372.hdf5'), PosixPath('dataset_top10/26734.hdf5'), PosixPath('dataset_top10/28656.hdf5'), PosixPath('dataset_top10/61333.hdf5'), PosixPath('dataset_top10/4718.hdf5'), PosixPath('dataset_top10/25571.hdf5')]

次のコードで辞書ファイルをロードして,認識対象の単語数を格納します.

1
2
3
4
5
# Load dictionary.
with open(dictionary, "r") as fread:
    key2token = json.load(fread)

VOCAB = len(key2token)

次のコードで前処理を定義します.
固定の前処理には,以前に説明した追跡点の選定を適用して実験を行います.

アフィン変換の後に正規化を行う場合は,追跡点の正規化を固定の前処理に入れることはできません.
そのため,追跡点の正規化の有無に従って,pre_transforms_w_normpre_transforms_wo_norm の2種類の固定前処理を定義しています.

データ拡張処理は動的な前処理として,transforms_pre_saffine_whole (22-32行目,正規化前に全体に適用),transforms_post_saffine_whole (34-43行目,正規化後に全体に適用),transforms_post_saffine_parts (45-82行目, 正規化後に部位毎に適用) に定義しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
_, use_landmarks = get_fullbody_landmarks()
use_features = ["x", "y"]
trans_select_feature = SelectLandmarksAndFeature(landmarks=use_landmarks, features=use_features)
trans_repnan = ReplaceNan()
trans_norm = PartsBasedNormalization(align_mode="framewise", scale_mode="unique")

apply_ratio = 0.5
trans_range = (-0.1, 0.1)
scale_range = (1.0/1.5, 1.5)
rot_range = (-30, 30)
skew_range = (-30, 30)

pre_transforms_w_norm = Compose([trans_select_feature,
                                 trans_repnan,
                                 trans_norm])

pre_transforms_wo_norm = Compose([trans_select_feature,
                                  trans_repnan])

transforms_default = Compose([ToTensor()])

transforms_pre_saffine_whole = Compose([
    RandomAffineTransform2D(
        apply_ratio=apply_ratio,
        center_joints=[0, 2],
        target_joints=np.arange(0, len(use_landmarks)),
        trans_range=trans_range,
        scale_range=scale_range,
        rot_range=rot_range,
        skew_range=skew_range),
    trans_norm,
    ToTensor()])

transforms_post_saffine_whole = Compose([
    RandomAffineTransform2D(
        apply_ratio=apply_ratio,
        center_joints=[0, 2],
        target_joints=np.arange(0, len(use_landmarks)),
        trans_range=trans_range,
        scale_range=scale_range,
        rot_range=rot_range,
        skew_range=skew_range),
    ToTensor()])

transforms_post_saffine_parts = Compose([
    # Face.
    RandomAffineTransform2D(
        apply_ratio=apply_ratio,
        center_joints=[0, 2],
        target_joints=np.arange(0, 76),
        trans_range=trans_range,
        scale_range=scale_range,
        rot_range=rot_range,
        skew_range=skew_range),
    # LHand.
    RandomAffineTransform2D(
        apply_ratio=apply_ratio,
        center_joints=np.array([0, 2, 5, 9, 13, 17])+76,
        target_joints=np.arange(76, 76+21),
        trans_range=trans_range,
        scale_range=scale_range,
        rot_range=rot_range,
        skew_range=skew_range),
    # Pose.
    RandomAffineTransform2D(
        apply_ratio=apply_ratio,
        center_joints=np.array([0, 1])+76+21,
        target_joints=np.arange(76+21, 76+21+12),
        trans_range=trans_range,
        scale_range=scale_range,
        rot_range=rot_range,
        skew_range=skew_range),
    # RHand.
    RandomAffineTransform2D(
        apply_ratio=apply_ratio,
        center_joints=np.array([0, 2, 5, 9, 13, 17])+76+21+12,
        target_joints=np.arange(76+21+12, 76+21+12+21),
        trans_range=trans_range,
        scale_range=scale_range,
        rot_range=rot_range,
        skew_range=skew_range),
    ToTensor()])

次のコードで,前処理を適用したHDF5DatasetとDataLoaderをインスタンス化し,データを取り出します.
HDF5Dataset をインスタンス化する際に,pre_transformstransforms 引数に変数を渡してデータ拡張を有効にしています (10行目).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
batch_size = 2
feature_shape = (len(use_features), -1, len(use_landmarks))
token_shape = (1,)
merge_fn = partial(merge_padded_batch,
                   feature_shape=feature_shape,
                   token_shape=token_shape,
                   feature_padding_val=0.0,
                   token_padding_val=0)

dataset = HDF5Dataset(hdf5_files, pre_transforms=pre_transforms_wo_norm, transforms=transforms_pre_saffine_whole)

dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=merge_fn)
try:
    data = next(iter(dataloader))
    feature_origin = data["feature"]

    print(feature_origin.shape)
except Exception as inst:
    print(inst)
torch.Size([2, 2, 14, 130])

次のコードでモデルをインスタンス化して,動作チェックをします.
追跡点抽出の結果,入力追跡点数は130で,各追跡点はXY座標値を持っていますので,入力次元数は260になります.
出力次元数は単語数なので10になります.
また,Transformer層の入力次元数は64に設定し,PFFN内部の拡張次元数は256に設定しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Define model.
# in_channels: J * C (130*2=260)
#   J: use_landmarks (130)
#   C: use_channels (2)
# out_channels: 10
in_channels = len(use_landmarks) * len(use_features)
inter_channels = 64
out_channels = VOCAB
activation = "relu"
tren_num_layers = 2
tren_num_heads = 2
tren_dim_ffw = 256
tren_dropout_pe = 0.1
tren_dropout = 0.1
tren_layer_norm_eps = 1e-5
tren_norm_first = True
tren_add_bias = True
tren_add_tailnorm = True

model = TransformerEnISLR(in_channels=in_channels,
                          inter_channels=inter_channels,
                          out_channels=out_channels,
                          activation=activation,
                          tren_num_layers=tren_num_layers,
                          tren_num_heads=tren_num_heads,
                          tren_dim_ffw=tren_dim_ffw,
                          tren_dropout_pe=tren_dropout_pe,
                          tren_dropout=tren_dropout,
                          tren_layer_norm_eps=tren_layer_norm_eps,
                          tren_norm_first=tren_norm_first,
                          tren_add_bias=tren_add_bias,
                          tren_add_tailnorm=tren_add_tailnorm)
print(model)

# Sanity check.
logit = model(feature_origin)
print(logit.shape)
attw0 = model.tr_encoder.layers[0].attw.detach().cpu().numpy()
attw1 = model.tr_encoder.layers[0].attw.detach().cpu().numpy()
print(attw0.shape, attw1.shape)
TransformerEnISLR(
  (linear): Linear(in_features=260, out_features=64, bias=True)
  (activation): ReLU()
  (tr_encoder): TransformerEncoder(
    (pos_encoder): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (w_key): Linear(in_features=64, out_features=64, bias=True)
          (w_value): Linear(in_features=64, out_features=64, bias=True)
          (w_query): Linear(in_features=64, out_features=64, bias=True)
          (w_out): Linear(in_features=64, out_features=64, bias=True)
          (dropout_attn): Dropout(p=0.1, inplace=False)
        )
        (ffw): PositionwiseFeedForward(
          (w_1): Linear(in_features=64, out_features=256, bias=True)
          (w_2): Linear(in_features=256, out_features=64, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (activation): ReLU()
        )
        (dropout): Dropout(p=0.1, inplace=False)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
    )
    (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  )
  (head): GPoolRecognitionHead(
    (head): Linear(in_features=64, out_features=10, bias=True)
  )
)
torch.Size([2, 10])
(2, 2, 14, 14) (2, 2, 14, 14)

5. 学習と評価の実行

5.1 共通設定

では,実際に学習・評価を行います.
まずは,実験全体で共通して用いる設定値を次のコードで実装します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Set common parameters.
batch_size = 32
load_into_ram = True
test_pid = 16069
num_workers = os.cpu_count()
print(f"Using {num_workers} cores for data loading.")
lr = 3e-4

epochs = 50
eval_every_n_epochs = 1
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} for computation.")

train_hdf5files = [fin for fin in hdf5_files if str(test_pid) not in fin.name]
val_hdf5files = [fin for fin in hdf5_files if str(test_pid) in fin.name]
test_hdf5files = [fin for fin in hdf5_files if str(test_pid) in fin.name]

_, use_landmarks = get_fullbody_landmarks()
use_features = ["x", "y"]
Using 2 cores for data loading.
Using cuda for computation.

5.2 学習・評価の実行

次のコードで学習・バリデーション・評価処理それぞれのためのDataLoaderクラスを作成します.
今回は,データ拡張処理の有無および種類による認識性能の違いを見たいので,実験毎にデータセットクラスをインスタンス化します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Build dataloaders.
train_dataset = HDF5Dataset(
    train_hdf5files,
    pre_transforms=pre_transforms_w_norm,
    transforms=transforms_default,
    load_into_ram=load_into_ram)
val_dataset = HDF5Dataset(
    val_hdf5files,
    pre_transforms=pre_transforms_w_norm,
    transforms=transforms_default,
    load_into_ram=load_into_ram)
test_dataset = HDF5Dataset(
    test_hdf5files,
    pre_transforms=pre_transforms_w_norm,
    transforms=transforms_default,
    load_into_ram=load_into_ram)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=1, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)

次のコードでモデルをインスタンス化します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
tren_norm_first = True
tren_add_tailnorm = True

model_default = TransformerEnISLR(
    in_channels=in_channels,
    inter_channels=inter_channels,
    out_channels=out_channels,
    activation=activation,
    tren_num_layers=tren_num_layers,
    tren_num_heads=tren_num_heads,
    tren_dim_ffw=tren_dim_ffw,
    tren_dropout_pe=tren_dropout_pe,
    tren_dropout=tren_dropout,
    tren_layer_norm_eps=tren_layer_norm_eps,
    tren_norm_first=tren_norm_first,
    tren_add_bias=tren_add_bias,
    tren_add_tailnorm=tren_add_tailnorm)
print(model_default)

loss_fn = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model_default.parameters(), lr=lr)
TransformerEnISLR(
  (linear): Linear(in_features=260, out_features=64, bias=True)
  (activation): ReLU()
  (tr_encoder): TransformerEncoder(
    (pos_encoder): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (w_key): Linear(in_features=64, out_features=64, bias=True)
          (w_value): Linear(in_features=64, out_features=64, bias=True)
          (w_query): Linear(in_features=64, out_features=64, bias=True)
          (w_out): Linear(in_features=64, out_features=64, bias=True)
          (dropout_attn): Dropout(p=0.1, inplace=False)
        )
        (ffw): PositionwiseFeedForward(
          (w_1): Linear(in_features=64, out_features=256, bias=True)
          (w_2): Linear(in_features=256, out_features=64, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (activation): ReLU()
        )
        (dropout): Dropout(p=0.1, inplace=False)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
    )
    (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  )
  (head): GPoolRecognitionHead(
    (head): Linear(in_features=64, out_features=10, bias=True)
  )
)

次のコードで学習・評価処理を行います.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Train, validation, and evaluation.
model_default.to(device)

train_losses = []
val_losses = []
test_accs = []
print("Start training.")
for epoch in range(epochs):
    print("-" * 80)
    print(f"Epoch {epoch+1}")

    train_losses = train_loop(train_dataloader, model_default, loss_fn, optimizer, device)
    val_loss = val_loop(val_dataloader, model_default, loss_fn, device)
    val_losses.append(val_loss)

    if (epoch+1) % eval_every_n_epochs == 0:
        acc = test_loop(test_dataloader, model_default, device)
        test_accs.append(acc)
train_losses_default = np.array(train_losses)
val_losses_default = np.array(val_losses)
test_accs_default = np.array(test_accs)

print(f"Minimum validation loss:{val_losses_default.min()} at {np.argmin(val_losses_default)+1} epoch.")
print(f"Maximum accuracy:{test_accs_default.max()} at {np.argmax(test_accs_default)*eval_every_n_epochs+1} epoch.")
Start training.
--------------------------------------------------------------------------------
Epoch 1
Start training.
loss:3.558914 [    0/ 3881]
loss:1.855397 [ 3200/ 3881]
Done. Time:6.237153717000012
Training performance: 
 Avg loss:2.108112

Start validation.
Done. Time:0.24819224300000542
Validation performance: 
 Avg loss:1.865472

Start evaluation.
Done. Time:1.2701316250000332
Test performance: 
 Accuracy:40.0%
--------------------------------------------------------------------------------
...
--------------------------------------------------------------------------------
Epoch 50
Start training.
loss:0.032988 [    0/ 3881]
loss:0.406723 [ 3200/ 3881]
Done. Time:2.9173141680000754
Training performance: 
 Avg loss:0.185274

Start validation.
Done. Time:0.32901669600005334
Validation performance: 
 Avg loss:0.968079

Start evaluation.
Done. Time:1.588219974000026
Test performance: 
 Accuracy:75.0%
Minimum validation loss:0.7963169728006635 at 34 epoch.
Maximum accuracy:81.5 at 49 epoch.

以後,同様の処理を設定毎に繰り返します.
コード構成は同じですので,ここでは説明を割愛させていただきます. また,この後グラフ等の描画も行っておりますが,本記事の主要点ではないため説明を割愛させていただきます.


5.3 実験結果

認識性能の比較結果を図8に示します.
こちらは図7の再掲図です.

図8 (再掲): 認識性能比較結果
認識性能比較結果

横軸は学習・評価ループの繰り返し数 (Epoch) を示します.
縦軸はそれぞれの評価指標を示します.

各線の色と実験条件の関係は次のとおりです.

  • 青線 (Default): Pre-LN構成のTransformer
  • 橙線 (+ Pre-W): 正規化前に全身に適用
  • 緑線 (+ Post-W): 正規化後に全身に適用
  • 赤線 (+ Post-P): 正規化後に部位毎に適用

デフォルトのモデルには,第九回の記事で紹介した,Pre-LN構成のTransformerモデルを用います.

青線と他の線の比較結果から,データ拡張を適用した場合はロスの値,認識性能ともに改善していることが分かります.
今回の実験では,正規化前に全身に適用した場合 (Pre-W) が最も安定した学習になっているようです.
他の方法は Pre-W よりも複雑な変換をするため,性能を引き出すにはより細かなパラメータ調整が必要であったかもしれません.


今回はアフィン変換を適用して追跡点系列を変形することで,データ拡張を行う手法を紹介しましたが,如何でしたでしょうか?
アフィン変換は標準的なデータ拡張なのですが,パラメータが多く調整に少し苦労しますね (^^;).

今回紹介した話が,これから手話認識を勉強してみようとお考えの方に何か参考になれば幸いです.