手話認識入門 補足 - 深掘りTransformer4: 欠損値の補間は認識性能を向上させるのか?

This image is generated with ChatGPT-4 Omni, and edited by the author.
作成日:2024年07月20日(土) 00:00
最終更新日:2024年10月24日(木) 18:25
カテゴリ:手話言語処理
タグ:  手話認識入門 孤立手話単語認識 Transformer 線形補間 Python

追跡点の欠損値を補間した場合,認識性能が向上するのかを実験しました.

こんにちは.高山です.
今回は,第九回の記事,および第十六回の記事の補足になります.

第十六回の記事では,手話中の追跡点系列をマスキングして劣化させたデータを学習することで,認識モデルの頑健性を向上させるアプローチを紹介しました.

今回はそれの対になる手法,つまり追跡点の欠損値を補間して綺麗にしたデータを学習させたら,認識性能が向上するのかを実験してみました.

図1に線形補間を用いて欠損値を埋めた例を示します.

(a): 線形補間前
(b): 線形補間後
(c): 線形補間と手首の位置合わせ後
線形補間の適用例

図1(b) は線形補間後の追跡点で,図1(c) は更に手首の位置合わせを行っています.

如何でしょう.
適用例を見ると,動きの自然さが増しており認識性能も向上しそうな気がしませんか?

結論から先に申し上げると,少なくとも今回のタスク (GISLRデータセットを用いた孤立手話単語認識),およびモデル (Transformer-Encoder) で試した限りでは,認識性能は向上しませんでした.

まず手首の位置合わせは,正規化の過程でキャンセルされてしまうので認識性能に影響しません.
では線形補間はどうかというと,欠落フレームが無くなる分アテンションが全フレームを満遍なく見るようになります.
結果としてアテンションが有効なフレームを抽出する能力が落ちてしまい,認識性能が上がらなくなっているように感じました.

上記のような結果でしたので改善手法として紹介するのは諦めました.
が,お蔵入りにするのも勿体ないので (^^;),補足記事としてこちらに残したいと思います.
何か参考になれば幸いです.

今回解説するスクリプトはGitHub上に公開しています
複数の実験を行っている都合で,CPUで動かした場合は結構時間がかるのでご注意ください.

更新履歴 (大きな変更のみ記載しています)

  • 2024/09/18: カテゴリを変更しました
  • 2024/09/17: タグを更新しました

1. 線形補間の処理工程

図2に,追跡点系列の線形補間処理工程を示します.

線形補間の処理工程を示すブロック図と,特徴マップがどのように変化するかを示す画像です.画像の後に説明があります.
線形補間処理工程

今回はパラメータ等も無いので,単純に処理を適用するだけです.
重要なのは処理の効果で,図2の特徴マップに示すように,線形補間を適用することで欠落していた特徴量を穴埋めすることができます.

2. 実験結果

第2節以降では,いつも通り実装の紹介をしながら実験結果をお見せします.
コード紹介記事の方針として記事単体で全処理が分かるように書いており,少し長いので結果を先にお見せしたいと思います.

2.1 頻度の多い10単語を学習させた結果

図3および図4は,線形変換の有無によるValidation Lossと認識率の推移を示しています.
線形補間は正規化などと同様に,固定の前処理として導入することが多いです.
今回は挙動を比較,解析するために学習とテストのどちらかだけに適用した場合や,データ拡張のようにランダムに適用するケースも実施しました.

まず図3に,テスト時に補間を行わない場合の結果を示します.

10単語のデータセットを用いて,テスト時は補間をしなかった場合の,認識性能比較結果を図示したグラフです.画像前後の文章に詳細説明があります.
認識性能比較結果 (Top10,テスト時補間無し)

横軸は学習・評価ループの繰り返し数 (Epoch) を示します.
縦軸はそれぞれの評価指標を示します.

各線の色と実験条件の関係は次のとおりです.

  • 青線 (Default): Pre-LN構成のTransformer,学習時補間無し
  • 橙線 (+ T-Interp (0.5)): 学習時補間有り (50%)
  • 緑線 (+ T-Interp (1.0)): 学習時補間有り (100%)

デフォルトのモデルには,第九回の記事で紹介した,Pre-LN構成のTransformerモデルを用います.

実験結果から,学習時に線形補間をした場合 (緑線) の性能が最も悪いことが分かります.
学習時とテスト時の処理が違うのでこれは自然な結果と言えます.

次に図4に,テスト時に補間を行う場合の結果を示します.

10単語のデータセットを用いて,テスト時の補間をした場合の,認識性能比較結果を図示したグラフです.画像前後の文章に詳細説明があります.
認識性能比較結果 (Top10,テスト時補間有り)

横軸と縦軸が示す内容,および各線の色と実験条件の関係は図3と同様です.

当然ながら,図4の結果は図3と逆の関係になっており,学習時に線形補間を行わないケース (青線) が最も悪い認識性能になっています.

ここで注目したいのが,最終的に達成している認識性能です.
図3の青線と図4の緑線を比べるとどちらも 80% 程度で収束しており,明確な性能差が無いことが分かります.
図3と図4の双方で,橙線が性能を保っていることは少し興味深いですが,性能が向上すると言い切れる結果は得られませんでした.

2.2 250単語を学習させた結果

データが少なくて学習が不安定になっている可能性がありますので,全データ (250単語) を学習させた場合の挙動を図5と図6に示します.
なお,こちらの実験はメモリや処理時間の都合でColab上では実行が難しいので,ローカル環境で行いました.

データの分割方法は10単語のときと同じです.
ただし,学習時間を短縮するためにバッチ数は256に設定しています.
(本来はバッチ数を変えた場合は学習率も調整した方が良いのですが,今回はママで実験を行っています)

250単語のデータセットを用いて,テスト時は補間をしなかった場合の,認識性能比較結果を図示したグラフです.画像前後の文章に詳細説明があります.
認識性能比較結果 (全単語,テスト時補間無し)
250単語のデータセットを用いて,テスト時に補間をした場合の,認識性能比較結果を図示したグラフです.画像前後の文章に詳細説明があります.
認識性能比較結果 (全単語,テスト時補間有り)

横軸と縦軸が示す内容,および各線の色と実験条件の関係は図3と同様です.

実験結果の傾向は 10単語を用いた場合と同様です
学習とテストの処理が一致していないケースでは性能が悪くなります.
また,処理が一致しているケースおよびデータ拡張として線形補間を用いたケースも,最終的な性能差はほとんどありませんでした.

なお,今回の実験では話を簡単にするために,実験条件以外のパラメータは固定にし,乱数の制御もしていません.
必ずしも同様の結果になるわけではないので,ご了承ください.

2.3 Self-attentionの挙動

最後に,図7と図8に Self-attention 層の挙動を示します.
図7は学習時に線形補間を用いなかったモデルの挙動を示しています.

学習時に補間をしなかった場合のSelf-attention層の出力を描いています.画像の後に説明があります.
Self-attentionの挙動: 学習時補間無し

図7(a) はテスト時に補間を用いなかった場合の Self-attention の重みを示しています.
追跡成功フレームにだけ高い重みを与えて,失敗フレームの影響は Self-attention 内で除去していることが分かります.

図7(b) はテスト時に補間を用いた場合の Self-attention の重みを示しています.
欠損フレームが補間された結果,Self-attention はフレーム全体を満遍なく見るような挙動に変わっています.
この挙動自体は自然なことですが,学習時とギャップがあるため認識性能が劣化する要因になっていると予想できます.

図8は学習時に線形補間を用いたモデルの挙動を示しています.

学習時に補間をした場合のSelf-attention層の出力を描いています.画像の後に説明があります.
Self-attentionの挙動: 学習時補間有り

図8(a) はテスト時に補間を用いなかった場合の Self-attention の重みを示しています.
こちらのモデルは全フレームを満遍なく見るように学習している傾向があり,欠損フレーム箇所にも重みを与えています.
欠損フレームの特徴量を取り込んでしまうため,認識性能が劣化する要因になっていると予想できます.

図8(b) はテスト時に補間を用いた場合の Self-attention の重みを示しています.
全体を満遍なく見る傾向は同様ですが,図7(b) に比べて重要な箇所にフォーカスできているように感じます.

ここまで見てきたように,欠損値が含まれる場合,補間した場合双方で Transformer は適切に対処して認識をしてくれます.
学習時とテスト時で一貫した処理を行うことが重要で,補間の有無は Transformer がカバーしてくれることから大きな差にはならないことが分かりました.

3. 前準備

3.1 データセットのダウンロード

ここからは実装方法の説明をしていきます.
まずは,前準備としてGoogle Colabにデータセットをアップロードします. ここの工程はこれまでの記事と同じですので,既に行ったことのある方は第3.3項まで飛ばしていただいて構いません.

まず最初に,データセットの格納先からデータをダウンロードし,ご自分のGoogle driveへアップロードしてください.

次のコードでGoogle driveをColabへマウントします.
Google Driveのマウント方法については,補足記事にも記載してあります.

1
2
3
from google.colab import drive

drive.mount("/content/drive")

ドライブ内のファイルをColabへコピーします.
パスはアップロード先を設定する必要があります.

# Copy to local.
!cp [path_to_dataset]/gislr_dataset_top10.zip gislr_top10.zip

データセットはZIP形式になっているので unzip コマンドで解凍します.

!unzip gislr_top10.zip
Archive:  gislr_top10.zip
   creating: dataset_top10/
  inflating: dataset_top10/16069.hdf5
  ...
  inflating: dataset_top10/sign_to_prediction_index_map.json

成功すると dataset_top10 以下にデータが解凍されます.
HDF5ファイルはデータ本体で,手話者毎にファイルが別れています.
JSONファイルは辞書ファイルで,TXTファイルは本データセットのライセンスです.

!ls dataset_top10
16069.hdf5  25571.hdf5  29302.hdf5  36257.hdf5  49445.hdf5  62590.hdf5
18796.hdf5  26734.hdf5  30680.hdf5  37055.hdf5  53618.hdf5  LICENSE.txt
2044.hdf5   27610.hdf5  32319.hdf5  37779.hdf5  55372.hdf5  sign_to_prediction_index_map.json
22343.hdf5  28656.hdf5  34503.hdf5  4718.hdf5   61333.hdf5

単語辞書には単語名と数値の関係が10単語分定義されています.

!cat dataset_top10/sign_to_prediction_index_map.json
{
    "listen": 0,
    "look": 1,
    "shhh": 2,
    "donkey": 3,
    "mouse": 4,
    "duck": 5,
    "uncle": 6,
    "hear": 7,
    "pretend": 8,
    "cow": 9
}

ライセンスはオリジナルと同様に,CC-BY 4.0 としています.

!cat dataset_top10/LICENSE.txt
The dataset provided by Natsuki Takayama (Takayama Research and Development Office) is licensed under CC-BY 4.0.
Author: Copyright 2024 Natsuki Takayama
Title: GISLR Top 10 dataset
Original licenser: Deaf Professional Arts Network and the Georgia Institute of Technology
Modification
- Extract 10 most frequent words.
- Packaged into HDF5 format.

次のコードでサンプルを確認します.
サンプルは辞書型のようにキーバリュー形式で保存されており,下記のように階層化されています.

- サンプルID (トップ階層のKey)
  |- feature: 入力特徴量で `[C(=3), T, J(=543)]` 形状.C,T,Jは,それぞれ特徴次元,フレーム数,追跡点数です.
  |- token: 単語ラベル値で `[1]` 形状.0から9の数値です.
1
2
3
4
5
6
7
8
9
with h5py.File("dataset_top10/16069.hdf5", "r") as fread:
    keys = list(fread.keys())
    print(keys)
    group = fread[keys[0]]
    print(group.keys())
    feature = group["feature"][:]
    token = group["token"][:]
    print(feature.shape)
    print(token)
['1109479272', '11121526', ..., '976754415']
<KeysViewHDF5 ['feature', 'token']>
(3, 23, 543)
[1]

3.2 モジュールのダウンロード

次に,過去の記事で実装したコードをダウンロードします.
本項は前回までに紹介した内容と同じですので,飛ばしていただいても構いません. コードはGithubのsrc/modules_gislrにアップしてあります (今後の記事で使用するコードも含まれています).

まず,下記のコマンドでレポジトリをダウンロードします.
(目的のディレクトリだけダウンロードする方法はまだ調査中です(^^;))

!wget https://github.com/takayama-rado/trado_samples/archive/master.zip
--2024-01-21 11:01:47--  https://github.com/takayama-rado/trado_samples/archive/master.zip
Resolving github.com (github.com)... 140.82.112.3
...
2024-01-21 11:01:51 (19.4 MB/s) - ‘master.zip’ saved [75710869]

ダウンロードしたリポジトリを解凍します.

!unzip -o master.zip -d master
Archive:  master.zip
641b06a0ca7f5430a945a53b4825e22b5f3b8eb6
   creating: master/trado_samples-main/
  inflating: master/trado_samples-main/.gitignore
  ...

モジュールのディレクトリをカレントディレクトリに移動します.

!mv master/trado_samples-main/src/modules_gislr .

他のファイルは不要なので削除します.

!rm -rf master master.zip gislr_top10.zip
!ls
dataset_top10 drive modules_gislr  sample_data

3.3 モジュールのロード

主要な処理の実装に先立って,下記のコードでモジュールをロードします.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
import os
import random
import sys
from functools import partial
from pathlib import Path
from typing import (
    Dict
)

# Third party's modules
import numpy as np

import torch
from torch.utils.data import (
    DataLoader)

from torchvision.transforms import Compose

# Local modules
sys.path.append("modules_gislr")
from modules_gislr.dataset import (
    HDF5Dataset,
    merge_padded_batch)
from modules_gislr.defines import (
    get_fullbody_landmarks
)
from modules_gislr.layers import (
    TransformerEnISLR
)
from modules_gislr.train_functions import (
    test_loop,
    val_loop,
    train_loop
)
from modules_gislr.transforms import (
    PartsBasedNormalization,
    ReplaceNan,
    SelectLandmarksAndFeature,
    ToTensor,
    matrix_interp
)
【コード解説】
- 標準モジュール
  - json: JSONファイル制御ライブラリ.辞書ファイルのロードに使用します.
  - os: システム処理ライブラリ
  - random: ランダム値生成ライブラリ
  - sys: Pythonインタプリタの制御ライブラリ.
    今回はローカルモジュールに対してパスを通すために使用します.
  - functools: 関数オブジェクトを操作するためのライブラリ.
    今回はDataLoaderクラスに渡すパディング関数に対して設定値をセットするために使用します.
  - pathlib.Path: オブジェクト指向のファイルシステム機能.
    主にファイルアクセスに使います.osモジュールを使っても同様の処理は可能です.
    高山の好みでこちらのモジュールを使っています(^^;).
  - typing: 関数などに型アノテーションを行う機能.
    ここでは型を忘れやすい関数に付けていますが,本来は全てアノテーションをした方が良いでしょう(^^;).
- 3rdパーティモジュール
  - numpy: 行列演算ライブラリ
  - torch: ニューラルネットワークライブラリ
  - torchvision: PyTorchと親和性が高い画像処理ライブラリ.
    今回はDatasetクラスに与える前処理をパッケージするために用います.
- ローカルモジュール: sys.pathにパスを追加することでロード可能
  - dataset: データセット操作用モジュール
  - defines: 各部位の追跡点,追跡点間の接続関係,およびそれらへのアクセス処理を
    定義したモジュール
  - layers: ニューラルネットワークのモデルやレイヤモジュール
  - train_functions: 学習・評価処理モジュール
  - transforms: 入出力変換処理モジュール

4. 線形補間処理の実装

本節では線形補間処理を実装します.
線形補間処理に関してはこちらの記事でも解説してますので,ご一読いただけましたらうれしいです.
線形補間処理は前処理に実装しますので,データ拡張と同様にクラスを定義して呼び出せるように実装したいと思います.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class PartsBasedTemporalInterpolation():
    """Apply parts-based temporal interpolation.
    """

    def __init__(self,
                 apply_ratio=1.0,
                 face_head=0,
                 face_num=76,
                 lhand_head=76,
                 lhand_num=21,
                 pose_head=76+21,
                 pose_num=12,
                 rhand_head=76+21+12,
                 rhand_num=21,
                 accept_correct_ratio=0.1) -> None:
        self.apply_ratio = apply_ratio
        self.face_head = face_head
        self.face_num = face_num
        self.lhand_head = lhand_head
        self.lhand_num = lhand_num
        self.pose_head = pose_head
        self.pose_num = pose_num
        self.rhand_head = rhand_head
        self.rhand_num = rhand_num
        self.accept_correct_ratio = accept_correct_ratio

    def _gen_tmask(self, feature):
        tmask = feature == 0.0
        tmask = np.all(tmask, axis=(0, 2))
        tmask = np.logical_not(tmask)
        return tmask

    def _interp(self, feature):
        tmask = self._gen_tmask(feature)
        # `[C, T, J]`
        orig_shape = feature.shape
        tlength = orig_shape[1]
        # No failed tracking.
        if tmask.sum() == tlength:
            return feature
        # Too many failed.
        if tmask.sum() < self.accept_correct_ratio:
            return feature
        # All failed.
        if tmask.sum() == 0.0:
            return feature

        x = np.arange(tlength)
        xs = np.where(tmask != 0)[0]
        # `[C, T, J] -> [T, C*J]`
        feature = feature.transpose([1, 0, 2]).reshape([tlength, -1])
        ys = feature[xs, :]

        newfeature = matrix_interp(x, xs, ys)
        # `[T, C*J] -> [T, C, J] -> [C, T, J]`
        newfeature = newfeature.reshape([-1, orig_shape[0], orig_shape[2]])
        newfeature = newfeature.transpose([1, 0, 2])
        assert np.isnan(newfeature).any() == np.False_
        assert np.isinf(newfeature).any() == np.False_
        return newfeature

    def __call__(self,
                 data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute interpolation.
        """
        if random.random() > self.apply_ratio:
            return data

        feature = data["feature"]
        if self.face_num > 0:
            face = feature[:, :, self.face_head: self.face_head+self.face_num]
            face = self._interp(face)
            feature[:, :, self.face_head: self.face_head+self.face_num] = face
        if self.lhand_num > 0:
            lhand = feature[:, :, self.lhand_head: self.lhand_head+self.lhand_num]
            lhand = self._interp(lhand)
            feature[:, :, self.lhand_head: self.lhand_head+self.lhand_num] = lhand
        if self.pose_num > 0:
            pose = feature[:, :, self.pose_head: self.pose_head+self.pose_num]
            pose = self._interp(pose)
            feature[:, :, self.pose_head: self.pose_head+self.pose_num] = pose
        if self.rhand_num > 0:
            rhand = feature[:, :, self.rhand_head: self.rhand_head+self.rhand_num]
            rhand = self._interp(rhand)
            feature[:, :, self.rhand_head: self.rhand_head+self.rhand_num] = rhand
        data["feature"] = feature
        return data

    def __str__(self):
        return f"{self.__class__.__name__}:{self.__dict__}"
【コード解説】
- 引数
  - apply_ratio: データ拡張の適用確率.
  - face_head: 顔追跡点の先頭インデクス.
  - face_num: 顔追跡点数.
    `face_num <= 0` の場合は補間対象外になります.
  - lhand_head: 左手の先頭インデクス.
  - lhand_num: 左手追跡点点数.
    `lhand_num <= 0` の場合は補間対象外になります.
  - pose_head: 身体追跡点の先頭インデクス.
  - pose_num: 身体追跡点追跡点数.
    `pose_num <= 0` の場合は補間対象外になります.
  - rhand_head: 右手追跡点の先頭インデクス.
  - rhand_num: 右手追跡点数.
    `rhand_num <= 0` の場合は補間対象外になります.
  - accept_correct_ratio: 補間を行う最低追跡成功フレーム率.
    追跡成功フレーム率が `accept_correct_ratio` 未満の場合は,線形補間をスキップ
    します.
- 16-25行目: 初期化処理
- 27-31行目: マスク配列生成関数
  欠損フレーム箇所を示すマスク配列を生成します.
- 33-60行目: 線形補間関数
  - 34-46行目: マスク配列を生成して,線形補間対象とするか判定.
    追跡に全て成功/失敗している場合,および追跡成功フレーム率が `accept_correct_ratio`
    を下回る場合は,線形補間をスキップします.
  - 48-52行目: 補間処理の基準点と特徴量を算出し,線形補間の並列処理のために,
    特徴量の形状を `[C, T, J] -> [T, C*J]` に変更.
  - 54行目: 線形補間の適用.
  - 56-59行目: 特徴量形状を `[T, C*J] -> [C, T, J]` に変更.
- 62-87行目: 線形補間処理
  - 66-67行目: ランダムに $[0, 1]$ の値を取得し,`apply_ratio` より上の値の場合は何もしない
  - 70-87行目: 部位毎に線形補間を実行
- 89-90行目: print()に対して,クラス名と設定値を返す.

5. 認識モデルの動作確認

今回は,第九回の記事で紹介した,Pre-LN構成のTransformerモデルをそのまま用いて実験を行います.
ここではモデルの推論動作が正常に動くかだけ確かめます.

次のコードでデータセットからHDF5ファイルとJSONファイルのパスを読み込みます.

1
2
3
4
5
6
7
8
# Access check.
dataset_dir = Path("dataset_top10")
files = list(dataset_dir.iterdir())
dictionary = [fin for fin in files if ".json" in fin.name][0]
hdf5_files = [fin for fin in files if ".hdf5" in fin.name]

print(dictionary)
print(hdf5_files)
dataset_top10/sign_to_prediction_index_map.json
[PosixPath('dataset_top10/2044.hdf5'), ..., PosixPath('dataset_top10/25571.hdf5')]

次のコードで辞書ファイルをロードして,認識対象の単語数を格納します.

1
2
3
4
5
# Load dictionary.
with open(dictionary, "r") as fread:
    key2token = json.load(fread)

VOCAB = len(key2token)

次のコードで前処理を定義します.
固定の前処理には,以前に説明した追跡点の選定と,追跡点の正規化を適用して実験を行います.

線形補間処理は固定の前処理に実装することが多いのですが,今回はデータ拡張として用いた場合も試すので動的な前処理として実装します.
設定毎に,それぞれ transforms_tinterp_00 (11-13行目,線形補間無し),transforms_tinterp_05 (15-17行目,線形補間を50%の確率で適用),transforms_tinterp_10 (19-21行目,線形補間を100%適用) に定義しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
_, use_landmarks = get_fullbody_landmarks()
use_features = ["x", "y"]
trans_select_feature = SelectLandmarksAndFeature(landmarks=use_landmarks, features=use_features)
trans_repnan = ReplaceNan()
trans_norm = PartsBasedNormalization(align_mode="framewise", scale_mode="unique")

pre_transforms = Compose([trans_select_feature,
                          trans_repnan,
                          trans_norm])

transforms_tinterp_00 = Compose([
    PartsBasedTemporalInterpolation(apply_ratio=0.0),
    ToTensor()])

transforms_tinterp_05 = Compose([
    PartsBasedTemporalInterpolation(apply_ratio=0.5),
    ToTensor()])

transforms_tinterp_10 = Compose([
    PartsBasedTemporalInterpolation(apply_ratio=1.0),
    ToTensor()])

次のコードで,前処理を適用したHDF5DatasetとDataLoaderをインスタンス化し,データを取り出します.
HDF5Dataset をインスタンス化する際に,pre_transformstransforms 引数に変数を渡してデータ拡張を有効にしています (11行目).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
batch_size = 2
feature_shape = (len(use_features), -1, len(use_landmarks))
token_shape = (1,)
merge_fn = partial(merge_padded_batch,
                   feature_shape=feature_shape,
                   token_shape=token_shape,
                   feature_padding_val=0.0,
                   token_padding_val=0)

for trans in [transforms_tinterp_00, transforms_tinterp_05, transforms_tinterp_10]:
    dataset = HDF5Dataset(hdf5_files, pre_transforms=pre_transforms, transforms=trans)
    dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=merge_fn)
    try:
        data = next(iter(dataloader))
        feature_origin = data["feature"]

        print(feature_origin.shape)
    except Exception as inst:
        print(inst)
torch.Size([2, 2, 9, 130])
torch.Size([2, 2, 9, 130])
torch.Size([2, 2, 9, 130])

次のコードでモデルをインスタンス化して,動作チェックをします.
追跡点抽出の結果,入力追跡点数は130で,各追跡点はXY座標値を持っていますので,入力次元数は260になります.
出力次元数は単語数なので10になります.
また,Transformer層の入力次元数は64に設定し,PFFN内部の拡張次元数は256に設定しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Define model.
# in_channels: J * C (130*2=260)
#   J: use_landmarks (130)
#   C: use_channels (2)
# out_channels: 10
in_channels = len(use_landmarks) * len(use_features)
inter_channels = 64
out_channels = VOCAB
activation = "relu"
tren_num_layers = 2
tren_num_heads = 2
tren_dim_ffw = 256
tren_dropout_pe = 0.1
tren_dropout = 0.1
tren_layer_norm_eps = 1e-5
tren_norm_first = True
tren_add_bias = True
tren_add_tailnorm = True

pooling_type = "none"

model = TransformerEnISLR(in_channels=in_channels,
                          inter_channels=inter_channels,
                          out_channels=out_channels,
                          activation=activation,
                          pooling_type=pooling_type,
                          tren_num_layers=tren_num_layers,
                          tren_num_heads=tren_num_heads,
                          tren_dim_ffw=tren_dim_ffw,
                          tren_dropout_pe=tren_dropout_pe,
                          tren_dropout=tren_dropout,
                          tren_layer_norm_eps=tren_layer_norm_eps,
                          tren_norm_first=tren_norm_first,
                          tren_add_bias=tren_add_bias,
                          tren_add_tailnorm=tren_add_tailnorm)
print(model)

# Sanity check.
logit = model(feature_origin)
print(logit.shape)
attw0 = model.tr_encoder.layers[0].attw.detach().cpu().numpy()
attw1 = model.tr_encoder.layers[0].attw.detach().cpu().numpy()
print(attw0.shape, attw1.shape)
TransformerEnISLR(
  (linear): Linear(in_features=260, out_features=64, bias=True)
  (activation): ReLU()
  (pooling): Identity()
  (tr_encoder): TransformerEncoder(
    (pos_encoder): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (w_key): Linear(in_features=64, out_features=64, bias=True)
          (w_value): Linear(in_features=64, out_features=64, bias=True)
          (w_query): Linear(in_features=64, out_features=64, bias=True)
          (w_out): Linear(in_features=64, out_features=64, bias=True)
          (dropout_attn): Dropout(p=0.1, inplace=False)
        )
        (ffw): PositionwiseFeedForward(
          (w_1): Linear(in_features=64, out_features=256, bias=True)
          (w_2): Linear(in_features=256, out_features=64, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (activation): ReLU()
        )
        (dropout): Dropout(p=0.1, inplace=False)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
    )
    (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  )
  (head): GPoolRecognitionHead(
    (head): Linear(in_features=64, out_features=10, bias=True)
  )
)
torch.Size([2, 10])
(2, 2, 9, 9) (2, 2, 9, 9)

6. 学習と評価

6.1 共通設定

では,実際に学習・評価を行います.
まずは,実験全体で共通して用いる設定値を次のコードで実装します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Set common parameters.
batch_size = 32
load_into_ram = True
test_pid = 16069
num_workers = os.cpu_count()
print(f"Using {num_workers} cores for data loading.")
lr = 3e-4

epochs = 50
eval_every_n_epochs = 1
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} for computation.")

train_hdf5files = [fin for fin in hdf5_files if str(test_pid) not in fin.name]
val_hdf5files = [fin for fin in hdf5_files if str(test_pid) in fin.name]
test_hdf5files = [fin for fin in hdf5_files if str(test_pid) in fin.name]

_, use_landmarks = get_fullbody_landmarks()
use_features = ["x", "y"]
Using 2 cores for data loading.
Using cuda for computation.

次のコードで学習・バリデーション・評価処理それぞれのためのDataLoaderクラスを作成します.
今回は,データ拡張処理の種類による認識性能の違いを見たいので,先に必要なデータセットクラスをインスタンス化します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Build dataloaders.
train_dataset_tinterp_00 = HDF5Dataset(
    train_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_00,
    load_into_ram=load_into_ram)

train_dataset_tinterp_05 = HDF5Dataset(
    train_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_05,
    load_into_ram=load_into_ram)

train_dataset_tinterp_10 = HDF5Dataset(
    train_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_10,
    load_into_ram=load_into_ram)

val_dataset_tinterp_00 = HDF5Dataset(
    val_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_00,
    load_into_ram=load_into_ram)
test_dataset_tinterp_00 = HDF5Dataset(
    test_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_00,
    load_into_ram=load_into_ram)

val_dataset_tinterp_10 = HDF5Dataset(
    val_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_10,
    load_into_ram=load_into_ram)
test_dataset_tinterp_10 = HDF5Dataset(
    test_hdf5files,
    pre_transforms=pre_transforms,
    transforms=transforms_tinterp_10,
    load_into_ram=load_into_ram)

train_dataloader_tinterp_00 = DataLoader(train_dataset_tinterp_00, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=True)
train_dataloader_tinterp_05 = DataLoader(train_dataset_tinterp_05, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=True)
train_dataloader_tinterp_10 = DataLoader(train_dataset_tinterp_10, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=True)

val_dataloader_tinterp_00 = DataLoader(val_dataset_tinterp_00, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)
test_dataloader_tinterp_00 = DataLoader(test_dataset_tinterp_00, batch_size=1, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)

val_dataloader_tinterp_10 = DataLoader(val_dataset_tinterp_10, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)
test_dataloader_tinterp_10 = DataLoader(test_dataset_tinterp_10, batch_size=1, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)

学習用データセットは設定毎に,それぞれ train_dataloader_tinterp_00 (線形補間無し),train_dataloader_tinterp_05 (線形補間を50%の確率で適用),train_dataloader_tinterp_10 (線形補間を100%適用) に定義しています.
バリデーション用とテスト用は,線形補間の有無に応じて2種類ずつ定義しています.

6.2 学習・評価の実行

次のコードでモデルをインスタンス化します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
tren_norm_first = True
tren_add_tailnorm = True
pooling_type = "none"

model_tinterp_00 = TransformerEnISLR(
    in_channels=in_channels,
    inter_channels=inter_channels,
    out_channels=out_channels,
    activation=activation,
    pooling_type=pooling_type,
    tren_num_layers=tren_num_layers,
    tren_num_heads=tren_num_heads,
    tren_dim_ffw=tren_dim_ffw,
    tren_dropout_pe=tren_dropout_pe,
    tren_dropout=tren_dropout,
    tren_layer_norm_eps=tren_layer_norm_eps,
    tren_norm_first=tren_norm_first,
    tren_add_bias=tren_add_bias,
    tren_add_tailnorm=tren_add_tailnorm)
print(model_tinterp_00)

loss_fn = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model_tinterp_00.parameters(), lr=lr)
TransformerEnISLR(
  (linear): Linear(in_features=260, out_features=64, bias=True)
  (activation): ReLU()
  (pooling): Identity()
  (tr_encoder): TransformerEncoder(
    (pos_encoder): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (w_key): Linear(in_features=64, out_features=64, bias=True)
          (w_value): Linear(in_features=64, out_features=64, bias=True)
          (w_query): Linear(in_features=64, out_features=64, bias=True)
          (w_out): Linear(in_features=64, out_features=64, bias=True)
          (dropout_attn): Dropout(p=0.1, inplace=False)
        )
        (ffw): PositionwiseFeedForward(
          (w_1): Linear(in_features=64, out_features=256, bias=True)
          (w_2): Linear(in_features=256, out_features=64, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (activation): ReLU()
        )
        (dropout): Dropout(p=0.1, inplace=False)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      )
    )
    (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  )
  (head): GPoolRecognitionHead(
    (head): Linear(in_features=64, out_features=10, bias=True)
  )
)

次のコードで学習・評価処理を行います.
ループの中で,線形補間が無い場合と有る場合双方のバリデーションとテスト処理を呼び出している点に注意してください.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Train, validation, and evaluation.
model_tinterp_00.to(device)

train_losses = []
val_losses_tinterp_00 = []
test_accs_tinterp_00 = []
val_losses_tinterp_10 = []
test_accs_tinterp_10 = []
print("Start training.")
for epoch in range(epochs):
    print("-" * 80)
    print(f"Epoch {epoch+1}")

    train_losses = train_loop(train_dataloader_tinterp_00, model_tinterp_00, loss_fn, optimizer, device)
    val_loss_tinterp_00 = val_loop(val_dataloader_tinterp_00, model_tinterp_00, loss_fn, device)
    val_losses_tinterp_00.append(val_loss_tinterp_00)
    val_loss_tinterp_10 = val_loop(val_dataloader_tinterp_10, model_tinterp_00, loss_fn, device)
    val_losses_tinterp_10.append(val_loss_tinterp_10)

    if (epoch+1) % eval_every_n_epochs == 0:
        acc_tinterp_00 = test_loop(test_dataloader_tinterp_00, model_tinterp_00, device)
        test_accs_tinterp_00.append(acc_tinterp_00)
        acc_tinterp_10 = test_loop(test_dataloader_tinterp_10, model_tinterp_00, device)
        test_accs_tinterp_10.append(acc_tinterp_10)
train_losses_tinterp_00 = np.array(train_losses)
val_losses_tinterp_00_vs_tinterp_00 = np.array(val_losses_tinterp_00)
test_accs_tinterp_00_vs_tinterp_00 = np.array(test_accs_tinterp_00)
val_losses_tinterp_00_vs_tinterp_10 = np.array(val_losses_tinterp_10)
test_accs_tinterp_00_vs_tinterp_10 = np.array(test_accs_tinterp_10)

print(f"Minimum validation loss (Train: W0-Tinterp, Test: WO-Tinterp):{val_losses_tinterp_00_vs_tinterp_00.min()} at {np.argmin(val_losses_tinterp_00_vs_tinterp_00)+1} epoch.")
print(f"Maximum accuracy (Train: W0-Tinterp, Test: WO-Tinterp):{test_accs_tinterp_00_vs_tinterp_00.max()} at {np.argmax(test_accs_tinterp_00_vs_tinterp_00)*eval_every_n_epochs+1} epoch.")

print(f"Minimum validation loss (Train: W0-Tinterp, Test: W-Tinterp):{val_losses_tinterp_00_vs_tinterp_10.min()} at {np.argmin(val_losses_tinterp_00_vs_tinterp_10)+1} epoch.")
print(f"Maximum accuracy (Train: W0-Tinterp, Test: W-Tinterp):{test_accs_tinterp_00_vs_tinterp_10.max()} at {np.argmax(test_accs_tinterp_00_vs_tinterp_10)*eval_every_n_epochs+1} epoch.")
Start training.
--------------------------------------------------------------------------------
Epoch 1
Start training.
loss:3.820244 [    0/ 3881]
loss:1.714195 [ 3200/ 3881]
Done. Time:4.108065167999996
Training performance: 
 Avg loss:2.132699

Start validation.
Done. Time:0.24950152700000672
Validation performance: 
 Avg loss:2.096732

Start validation.
Done. Time:0.3037123720000068
Validation performance: 
 Avg loss:1.996433

Start evaluation.
Done. Time:1.2043801639999856
Test performance: 
 Accuracy:24.5%
Start evaluation.
Done. Time:1.1987244710000198
Test performance: 
 Accuracy:29.5%
--------------------------------------------------------------------------------
...
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Epoch 50
Start training.
loss:0.347790 [    0/ 3881]
loss:0.156235 [ 3200/ 3881]
Done. Time:3.226144624000085
Training performance: 
 Avg loss:0.182866

Start validation.
Done. Time:0.3976408219999712
Validation performance: 
 Avg loss:0.886100

Start validation.
Done. Time:0.487653418999912
Validation performance: 
 Avg loss:1.008274

Start evaluation.
Done. Time:1.6435284930000762
Test performance: 
 Accuracy:74.5%
Start evaluation.
Done. Time:1.2237320229999114
Test performance: 
 Accuracy:74.5%
Minimum validation loss (Train: W0-Tinterp, Test: WO-Tinterp):0.6450075890336718 at 26 epoch.
Maximum accuracy (Train: W0-Tinterp, Test: WO-Tinterp):83.0 at 41 epoch.
Minimum validation loss (Train: W0-Tinterp, Test: W-Tinterp):0.7228595316410065 at 26 epoch.
Maximum accuracy (Train: W0-Tinterp, Test: W-Tinterp):78.5 at 26 epoch.

以後,同様の処理を設定毎に繰り返します.
コード構成は同じですので,ここでは説明を割愛させていただきます.
また,この後グラフ等の描画も行っておりますが,本記事の主要点ではないため説明を割愛させていただきます.


今回は,追跡点の欠損値を補間して学習した場合,認識性能が向上するのかを実験してみましたが,如何でしたでしょうか?

残念ながら認識性能は向上しませんでしたが,Self-attention の挙動変化など興味深い現象が見れました.
また,線形補間をデータ拡張的に用いた場合には,テスト時の線形変換の有無に頑健になる点も興味深いです.

今回は孤立手話単語認識に対して Transformer-Encoder を用いて実験を行いました.
連続手話単語認識などの別タスクに試した場合,またはReccurent neural network など別のアーキテクチャを用いた場合は結果が変わるかもしれません.
機会があればまた実験をしてみたいと思います.

今回紹介した話が,これから手話認識を勉強してみようとお考えの方に何か参考になれば幸いです.